ashwinagalcha-ps opened a new issue, #11468:
URL: https://github.com/apache/hudi/issues/11468
When using Kafka + Debezium + Streamer, we are able to write data and the
job works fine, but when using the SqlQueryBasedTransformer, it is able to
write data on S3 with the new field but ultimately the job fails.
Below are the Hudi Deltastreamer job configs:
```"--table-type", "COPY_ON_WRITE",
"--source-class",
"org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource",
"--transformer-class",
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
"--hoodie-conf", "hoodie.streamer.transformer.sql=SELECT *, extract(year
from a.created_at) as year FROM <SRC> a",
"--source-ordering-field", output["source_ordering_field"],
"--target-base-path",
f"s3a://{env_params['deltastreamer_bucket']}/{db_name}/{schema}/{output['table_name']}/",
"--target-table", output["table_name"],
"--auto.offset.reset=earliest
"--props", properties_file,
"--payload-class",
"org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload",
"--enable-hive-sync",
"--hoodie-conf", "hoodie.datasource.hive_sync.mode=hms",
"--hoodie-conf",
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true",
"--hoodie-conf",
f"hoodie.deltastreamer.source.kafka.topic={connector_name}.{schema}.{output['table_name']}",
"--hoodie-conf", f"schema.registry.url={env_params['schema_registry_url']}",
"--hoodie-conf",
f"hoodie.deltastreamer.schemaprovider.registry.url={env_params['schema_registry_url']}/subjects/{connector_name}.{schema}.{output['table_name']}-value/versions/latest",
"--hoodie-conf",
"hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
"--hoodie-conf", "hoodie.datasource.hive_sync.use_jdbc=false",
"--hoodie-conf",
f"hoodie.datasource.hive_sync.database={output['hive_database']}",
"--hoodie-conf",
f"hoodie.datasource.hive_sync.table={output['table_name']}",
"--hoodie-conf", "hoodie.datasource.hive_sync.metastore.uris=",
"--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
"--hoodie-conf", "hoodie.datasource.hive_sync.support_timestamp=true",
"--hoodie-conf", "hoodie.deltastreamer.source.kafka.maxEvents=100000",
"--hoodie-conf",
f"hoodie.datasource.write.recordkey.field={output['record_key']}",
"--hoodie-conf",
f"hoodie.datasource.write.precombine.field={output['precombine_field']}",
"--hoodie-conf",
f"hoodie.datasource.hive_sync.glue_database={output['hive_database']}",
"--continuous"```
Properties file:
```bootstrap.servers=<host:port>
auto.offset.reset=earliest
schema.registry.url=http://host:8081```
**Expected behavior**: To be able to extract a new field (year) in the
target hudi table with the help of SqlQueryBasedTransformer.
**Environment Description**
* Hudi version : 0.14.0
* Spark version : 3.4.1
* Hadoop version : 3.3.4
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
* Base image & jars:
`public.ecr.aws/ocean-spark/spark:platform-3.4.1-hadoop-3.3.4-java-11-scala-2.12-python-3.10-gen21`
`https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar
https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.14.0/hudi-utilities-bundle_2.12-0.14.0.jar`
**Stacktrace**
```2024-06-14T14:16:17.562738557Z 24/06/14 14:16:17 ERROR HoodieStreamer:
Shutting down delta-sync due to exception
2024-06-14T14:16:17.562785897Z
org.apache.hudi.utilities.exception.HoodieTransformExecutionException: Failed
to apply sql query based transformer
2024-06-14T14:16:17.562797467Z at
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:68)
2024-06-14T14:16:17.562805097Z at
org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:105)
2024-06-14T14:16:17.562812197Z at
org.apache.hudi.utilities.streamer.StreamSync.lambda$fetchFromSource$0(StreamSync.java:530)
2024-06-14T14:16:17.562819517Z at
org.apache.hudi.common.util.Option.map(Option.java:108)
2024-06-14T14:16:17.562826327Z at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:530)
2024-06-14T14:16:17.562836838Z at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
2024-06-14T14:16:17.562844648Z at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
2024-06-14T14:16:17.562852958Z at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
2024-06-14T14:16:17.562860358Z at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
2024-06-14T14:16:17.562868059Z at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-06-14T14:16:17.562875549Z at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-06-14T14:16:17.562883938Z at
java.base/java.lang.Thread.run(Thread.java:829)
2024-06-14T14:16:17.562893178Z Caused by:
org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITHOUT_SUGGESTION]
A column or function parameter with name `a`.`created_at` cannot be resolved. ;
line 1 pos 10;
2024-06-14T14:16:17.562900609Z 'Project ['a.created_at AS year#0]
2024-06-14T14:16:17.562907429Z +- SubqueryAlias a
2024-06-14T14:16:17.562914209Z +- SubqueryAlias
hoodie_src_tmp_table_42422003_d19b_4f7b_8f34_51304c6aca57
2024-06-14T14:16:17.562920689Z +- View
(`HOODIE_SRC_TMP_TABLE_42422003_d19b_4f7b_8f34_51304c6aca57`, [])
2024-06-14T14:16:17.562927469Z +- LocalRelation <empty>
2024-06-14T14:16:17.562933689Z
2024-06-14T14:16:17.562939999Z at
org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:221)
2024-06-14T14:16:17.562947379Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:143)
2024-06-14T14:16:17.562965380Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:258)
2024-06-14T14:16:17.562972330Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
2024-06-14T14:16:17.562978850Z at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
2024-06-14T14:16:17.562985980Z at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
2024-06-14T14:16:17.562993190Z at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
2024-06-14T14:16:17.563000240Z at
scala.collection.Iterator.foreach(Iterator.scala:943)
2024-06-14T14:16:17.563007110Z at
scala.collection.Iterator.foreach$(Iterator.scala:943)
2024-06-14T14:16:17.563014090Z at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
2024-06-14T14:16:17.563021020Z at
scala.collection.IterableLike.foreach(IterableLike.scala:74)
2024-06-14T14:16:17.563027680Z at
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
2024-06-14T14:16:17.563035050Z at
scala.collection.AbstractIterable.foreach(Iterable.scala:56)
2024-06-14T14:16:17.563041790Z at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
2024-06-14T14:16:17.563048220Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563055380Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563062311Z at
scala.collection.immutable.Stream.foreach(Stream.scala:533)
2024-06-14T14:16:17.563069851Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
2024-06-14T14:16:17.563077001Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
2024-06-14T14:16:17.563084861Z at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
2024-06-14T14:16:17.563091541Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
2024-06-14T14:16:17.563098181Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
2024-06-14T14:16:17.563105001Z at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
2024-06-14T14:16:17.563111721Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
2024-06-14T14:16:17.563118621Z at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
2024-06-14T14:16:17.563125082Z at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
2024-06-14T14:16:17.563138102Z at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
2024-06-14T14:16:17.563145552Z at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
2024-06-14T14:16:17.563213833Z at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
2024-06-14T14:16:17.563224434Z at
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
2024-06-14T14:16:17.563231294Z at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
2024-06-14T14:16:17.563238014Z at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
2024-06-14T14:16:17.563245154Z at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
2024-06-14T14:16:17.563258994Z at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
2024-06-14T14:16:17.563266284Z at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563273604Z at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
2024-06-14T14:16:17.563280974Z at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
2024-06-14T14:16:17.563288444Z at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
2024-06-14T14:16:17.563295604Z at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
2024-06-14T14:16:17.563302205Z at
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
2024-06-14T14:16:17.563309075Z at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563315785Z at
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
2024-06-14T14:16:17.563322805Z at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
2024-06-14T14:16:17.563329675Z at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
2024-06-14T14:16:17.563336375Z at
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
2024-06-14T14:16:17.563343275Z at
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
2024-06-14T14:16:17.563350755Z at
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:64)```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]