soumilshah1995 commented on issue #10499:
URL: https://github.com/apache/hudi/issues/10499#issuecomment-1892807362
Also tried
```
spark-submit \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \
--properties-file spark-config.properties \
--master 'local[*]' \
--executor-memory 1g \
/Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar
\
--table-type COPY_ON_WRITE \
--op UPSERT \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--target-base-path
file:///Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/hudidb/orders
\
--target-table orders \
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--transformer-class
'org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.SqlQueryBasedTransformer'
\
--props hudi_tbl.props
```
hudi_tbl.prop
```
hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.precombine.field=ts
bootstrap.servers=localhost:7092
auto.offset.reset=earliest
hoodie.streamer.source.kafka.topic=orders_complex
hoodie.streamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
schema.registry.url=http://localhost:8081/
hoodie.streamer.schemaprovider.registry.schemaconverter=
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders_complex-value/versions/latest
hoodie.streamer.transformer.sql=SELECT order_id, name, order_value,
priority, order_date, ts, customer.customer_id AS customer_customer_id FROM
<SRC> a
#hoodie.streamer.transformer.sql=SELECT * FROM <SRC> a
```
error
```
4/01/15 16:58:07 INFO Metadata: [Consumer
clientId=consumer-spark-executor-null-2, groupId=spark-executor-null] Cluster
ID: OfY2BqB-RFiPOfaF84AoGw
24/01/15 16:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
1235 bytes result sent to driver
24/01/15 16:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID
0) in 137 ms on soumils-mbp (executor driver) (1/1)
24/01/15 16:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
24/01/15 16:58:07 INFO DAGScheduler: ResultStage 0 (isEmpty at
AvroConversionUtils.scala:99) finished in 0.446 s
24/01/15 16:58:07 INFO DAGScheduler: Job 0 is finished. Cancelling potential
speculative or zombie tasks for this job
24/01/15 16:58:07 INFO TaskSchedulerImpl: Killing all running tasks in stage
0: Stage finished
24/01/15 16:58:07 INFO DAGScheduler: Job 0 finished: isEmpty at
AvroConversionUtils.scala:99, took 0.464518 s
24/01/15 16:58:08 INFO FlatteningTransformer: Registering tmp table :
HUDI_SRC_TMP_TABLE_40151fe5_8414_4137_a1ab_02d5d1946d83
24/01/15 16:58:08 INFO SqlQueryBasedTransformer: Registering tmp table :
HOODIE_SRC_TMP_TABLE_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb
24/01/15 16:58:08 INFO StreamSync: Shutting down embedded timeline server
24/01/15 16:58:08 INFO SparkContext: SparkContext is stopping with exitCode
0.
24/01/15 16:58:08 INFO SparkUI: Stopped Spark web UI at
http://soumils-mbp:8090
24/01/15 16:58:08 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
24/01/15 16:58:08 INFO MemoryStore: MemoryStore cleared
24/01/15 16:58:08 INFO BlockManager: BlockManager stopped
24/01/15 16:58:08 INFO BlockManagerMaster: BlockManagerMaster stopped
24/01/15 16:58:08 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
24/01/15 16:58:08 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main"
org.apache.hudi.utilities.exception.HoodieTransformExecutionException: Failed
to apply sql query based transformer
at
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:68)
at
org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:105)
at
org.apache.hudi.utilities.streamer.StreamSync.lambda$fetchFromSource$0(StreamSync.java:530)
at org.apache.hudi.common.util.Option.map(Option.java:108)
at
org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:530)
at
org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
at
org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
at
org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840)
at
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
at
org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.AnalysisException:
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name
`customer`.`customer_id` cannot be resolved. Did you mean one of the following?
[`a`.`customer_customer_id`, `a`.`order_id`, `a`.`order_date`, `a`.`name`,
`a`.`order_value`].; line 1 pos 62;
'Project [order_id#21, name#22, order_value#23, priority#24, order_date#25,
ts#26, 'customer.customer_id AS customer_customer_id#36]
+- SubqueryAlias a
+- SubqueryAlias hoodie_src_tmp_table_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb
+- View (`HOODIE_SRC_TMP_TABLE_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb`,
[order_id#21,name#22,order_value#23,priority#24,order_date#25,ts#26,customer_customer_id#27])
+- Project [order_id#7 AS order_id#21, name#8 AS name#22,
order_value#9 AS order_value#23, priority#10 AS priority#24, order_date#11 AS
order_date#25, ts#12 AS ts#26, customer#13.customer_id AS
customer_customer_id#27]
+- SubqueryAlias
hudi_src_tmp_table_40151fe5_8414_4137_a1ab_02d5d1946d83
+- View
(`HUDI_SRC_TMP_TABLE_40151fe5_8414_4137_a1ab_02d5d1946d83`,
[order_id#7,name#8,order_value#9,priority#10,order_date#11,ts#12,customer#13])
+- LogicalRDD [order_id#7, name#8, order_value#9,
priority#10, order_date#11, ts#12, customer#13], false
at
org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:221)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:143)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:258)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
at
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:64)
... 23 more
24/01/15 16:58:08 INFO ShutdownHookManager: Shutdown hook called
24/01/15 16:58:08 INFO ShutdownHookManager: Deleting directory
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-992e8443-ead9-4f3d-8f5e-da9329fb95c0
24/01/15 16:58:08 INFO ShutdownHookManager: Deleting directory
/private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-225adb28-c729-49d9-84c6-c58d5b9f27b0
soumilshah@Soumils-MBP E11 %
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]