ashwinagalcha-ps opened a new issue, #11468:
URL: https://github.com/apache/hudi/issues/11468

   When using Kafka + Debezium + Streamer, we are able to write data and the 
job works fine, but when using the SqlQueryBasedTransformer, it is able to 
write data on S3 with the new field but ultimately the job fails.
   
   Below are the Hudi Deltastreamer job configs:
   
   ```"--table-type", "COPY_ON_WRITE",  
   "--source-class", 
"org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource",
   "--transformer-class", 
"org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
   "--hoodie-conf", "hoodie.streamer.transformer.sql=SELECT *, extract(year 
from a.created_at) as year FROM <SRC> a",
   "--source-ordering-field", output["source_ordering_field"], 
   "--target-base-path", 
f"s3a://{env_params['deltastreamer_bucket']}/{db_name}/{schema}/{output['table_name']}/",
  
   "--target-table", output["table_name"],  
   "--auto.offset.reset=earliest
   "--props", properties_file,  
   "--payload-class", 
"org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload",
   "--enable-hive-sync",  
   "--hoodie-conf", "hoodie.datasource.hive_sync.mode=hms",
   "--hoodie-conf", 
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true",
   "--hoodie-conf", 
f"hoodie.deltastreamer.source.kafka.topic={connector_name}.{schema}.{output['table_name']}",
   "--hoodie-conf", f"schema.registry.url={env_params['schema_registry_url']}",
   "--hoodie-conf", 
f"hoodie.deltastreamer.schemaprovider.registry.url={env_params['schema_registry_url']}/subjects/{connector_name}.{schema}.{output['table_name']}-value/versions/latest",
   "--hoodie-conf", 
"hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer",
   "--hoodie-conf", "hoodie.datasource.hive_sync.use_jdbc=false",  
   "--hoodie-conf", 
f"hoodie.datasource.hive_sync.database={output['hive_database']}",  
   "--hoodie-conf", 
f"hoodie.datasource.hive_sync.table={output['table_name']}",  
   "--hoodie-conf", "hoodie.datasource.hive_sync.metastore.uris=", 
   "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",  
   "--hoodie-conf", "hoodie.datasource.hive_sync.support_timestamp=true", 
   "--hoodie-conf", "hoodie.deltastreamer.source.kafka.maxEvents=100000",
   "--hoodie-conf", 
f"hoodie.datasource.write.recordkey.field={output['record_key']}", 
   "--hoodie-conf", 
f"hoodie.datasource.write.precombine.field={output['precombine_field']}",
   "--hoodie-conf", 
f"hoodie.datasource.hive_sync.glue_database={output['hive_database']}",
   "--continuous"```
   
   Properties file:
   ```bootstrap.servers=<host:port>
   auto.offset.reset=earliest
   schema.registry.url=http://host:8081```
   
   **Expected behavior**: To be able to extract a new field (year) in the 
target hudi table with the help of SqlQueryBasedTransformer.
   
   **Environment Description**
   
   * Hudi version : 0.14.0
   
   * Spark version : 3.4.1
   
   * Hadoop version : 3.3.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   * Base image & jars:
   
`public.ecr.aws/ocean-spark/spark:platform-3.4.1-hadoop-3.3.4-java-11-scala-2.12-python-3.10-gen21`
   
   
`https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/0.14.0/hudi-spark3.4-bundle_2.12-0.14.0.jar
   
https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.14.0/hudi-utilities-bundle_2.12-0.14.0.jar`
   
   **Stacktrace**
   
   ```2024-06-14T14:16:17.562738557Z 24/06/14 14:16:17 ERROR HoodieStreamer: 
Shutting down delta-sync due to exception
   2024-06-14T14:16:17.562785897Z 
org.apache.hudi.utilities.exception.HoodieTransformExecutionException: Failed 
to apply sql query based transformer
   2024-06-14T14:16:17.562797467Z       at 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:68)
   2024-06-14T14:16:17.562805097Z       at 
org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:105)
   2024-06-14T14:16:17.562812197Z       at 
org.apache.hudi.utilities.streamer.StreamSync.lambda$fetchFromSource$0(StreamSync.java:530)
   2024-06-14T14:16:17.562819517Z       at 
org.apache.hudi.common.util.Option.map(Option.java:108)
   2024-06-14T14:16:17.562826327Z       at 
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:530)
   2024-06-14T14:16:17.562836838Z       at 
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
   2024-06-14T14:16:17.562844648Z       at 
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
   2024-06-14T14:16:17.562852958Z       at 
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:757)
   2024-06-14T14:16:17.562860358Z       at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
   2024-06-14T14:16:17.562868059Z       at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   2024-06-14T14:16:17.562875549Z       at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   2024-06-14T14:16:17.562883938Z       at 
java.base/java.lang.Thread.run(Thread.java:829)
   2024-06-14T14:16:17.562893178Z Caused by: 
org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITHOUT_SUGGESTION] 
A column or function parameter with name `a`.`created_at` cannot be resolved. ; 
line 1 pos 10;
   2024-06-14T14:16:17.562900609Z 'Project ['a.created_at AS year#0]
   2024-06-14T14:16:17.562907429Z +- SubqueryAlias a
   2024-06-14T14:16:17.562914209Z    +- SubqueryAlias 
hoodie_src_tmp_table_42422003_d19b_4f7b_8f34_51304c6aca57
   2024-06-14T14:16:17.562920689Z       +- View 
(`HOODIE_SRC_TMP_TABLE_42422003_d19b_4f7b_8f34_51304c6aca57`, [])
   2024-06-14T14:16:17.562927469Z          +- LocalRelation <empty>
   2024-06-14T14:16:17.562933689Z 
   2024-06-14T14:16:17.562939999Z       at 
org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:221)
   2024-06-14T14:16:17.562947379Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:143)
   2024-06-14T14:16:17.562965380Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:258)
   2024-06-14T14:16:17.562972330Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
   2024-06-14T14:16:17.562978850Z       at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
   2024-06-14T14:16:17.562985980Z       at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
   2024-06-14T14:16:17.562993190Z       at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
   2024-06-14T14:16:17.563000240Z       at 
scala.collection.Iterator.foreach(Iterator.scala:943)
   2024-06-14T14:16:17.563007110Z       at 
scala.collection.Iterator.foreach$(Iterator.scala:943)
   2024-06-14T14:16:17.563014090Z       at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   2024-06-14T14:16:17.563021020Z       at 
scala.collection.IterableLike.foreach(IterableLike.scala:74)
   2024-06-14T14:16:17.563027680Z       at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   2024-06-14T14:16:17.563035050Z       at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   2024-06-14T14:16:17.563041790Z       at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
   2024-06-14T14:16:17.563048220Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
   2024-06-14T14:16:17.563055380Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
   2024-06-14T14:16:17.563062311Z       at 
scala.collection.immutable.Stream.foreach(Stream.scala:533)
   2024-06-14T14:16:17.563069851Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
   2024-06-14T14:16:17.563077001Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
   2024-06-14T14:16:17.563084861Z       at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
   2024-06-14T14:16:17.563091541Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
   2024-06-14T14:16:17.563098181Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
   2024-06-14T14:16:17.563105001Z       at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
   2024-06-14T14:16:17.563111721Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
   2024-06-14T14:16:17.563118621Z       at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
   2024-06-14T14:16:17.563125082Z       at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
   2024-06-14T14:16:17.563138102Z       at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
   2024-06-14T14:16:17.563145552Z       at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
   2024-06-14T14:16:17.563213833Z       at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
   2024-06-14T14:16:17.563224434Z       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
   2024-06-14T14:16:17.563231294Z       at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   2024-06-14T14:16:17.563238014Z       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
   2024-06-14T14:16:17.563245154Z       at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
   2024-06-14T14:16:17.563258994Z       at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
   2024-06-14T14:16:17.563266284Z       at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   2024-06-14T14:16:17.563273604Z       at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
   2024-06-14T14:16:17.563280974Z       at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
   2024-06-14T14:16:17.563288444Z       at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
   2024-06-14T14:16:17.563295604Z       at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
   2024-06-14T14:16:17.563302205Z       at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
   2024-06-14T14:16:17.563309075Z       at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   2024-06-14T14:16:17.563315785Z       at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
   2024-06-14T14:16:17.563322805Z       at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
   2024-06-14T14:16:17.563329675Z       at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   2024-06-14T14:16:17.563336375Z       at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
   2024-06-14T14:16:17.563343275Z       at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
   2024-06-14T14:16:17.563350755Z       at 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:64)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to