[ 
https://issues.apache.org/jira/browse/HUDI-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754654#comment-17754654
 ] 

sivabalan narayanan commented on HUDI-6693:
-------------------------------------------

spark3.2.1 streaming writes. 
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.types.{IntegerType, StringType, LongType, 
StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;import 
org.apache.spark.sql.streaming.Triggerval inputDf = 
spark.read.format("parquet").load("/tmp/parquet_ny/*")
val parquetdf = 
spark.readStream.option("spark.sql.streaming.schemaInference","true").option("maxFilesPerTrigger","1").schema(inputDf.schema).parquet("/tmp/parquet_ny/")
 val df = parquetdf.
 
select("VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","RatecodeID","store_and_fwd_flag","PULocationID","DOLocationID","payment_type","fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount","congestion_surcharge",
 "date_col")  val writer = df.writeStream.format("org.apache.hudi").
  option(TABLE_TYPE.key, "COPY_ON_WRITE").
          option(PRECOMBINE_FIELD.key, "tpep_dropoff_datetime").
          option(RECORDKEY_FIELD.key, "tpep_pickup_datetime").
          option(PARTITIONPATH_FIELD.key, "date_col").
          option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
          option(STREAMING_IGNORE_FAILED_BATCH.key, false).
          option(STREAMING_RETRY_CNT.key, 0).
          option("hoodie.table.name", "cow_table").
          option("hoodie.compact.inline.max.delta.commits","2").
          option("checkpointLocation", 
"/tmp/hudi_streaming_kafka10/checkpoint_cow4/").
          outputMode("append"); writer.trigger(Trigger.ProcessingTime("1 
minutes")).start("/tmp/hudi_streaming_kafka10/COW4"); {code}

> Streaming writes fail in quick start w/ 0.14.0 
> -----------------------------------------------
>
>                 Key: HUDI-6693
>                 URL: https://issues.apache.org/jira/browse/HUDI-6693
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: spark, writer-core
>            Reporter: sivabalan narayanan
>            Assignee: Raymond Xu
>            Priority: Major
>
> Quick starts fails w/ streaming ingestion 
>  
> {code:java}
> scala> df.writeStream.format("hudi").
>      |   options(getQuickstartWriteConfigs).
>      |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>      |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>      |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>      |   option(TABLE_NAME, streamingTableName).
>      |   outputMode("append").
>      |   option("path", baseStreamingPath).
>      |   option("checkpointLocation", checkpointLocation).
>      |   trigger(Trigger.Once()).
>      |   start()
> warning: one deprecation; for details, enable `:setting -deprecation' or 
> `:replay -deprecation'
> 23/08/10 14:31:09 WARN HoodieStreamingSink: Ignore TableNotFoundException as 
> it is first microbatch.
> 23/08/10 14:31:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is 
> not supported in streaming DataFrames/Datasets and will be disabled.
> res12: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@75143003
> scala> 23/08/10 14:31:10 WARN HiveConf: HiveConf of name 
> hive.stats.jdbc.timeout does not exist
> 23/08/10 14:31:10 WARN HiveConf: HiveConf of name hive.stats.retries.wait 
> does not exist
> 23/08/10 14:31:10 WARN AutoRecordKeyGenerationUtils$: Precombine field ts 
> will be ignored with auto record key generation enabled
> 23/08/10 14:31:10 WARN HoodieWriteConfig: Embedded timeline server is 
> disabled, fallback to use direct marker type for spark
> 23/08/10 14:31:10 ERROR HoodieStreamingSink: Micro batch id=0 threw following 
> exception: 
> org.apache.spark.sql.AnalysisException: Queries with streaming sources must 
> be executed with writeStream.start();
> LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, 
> _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, 
> begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, 
> fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107)
>       at 
> org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132)
>       at scala.util.Try$.apply(Try.scala:213)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129)
>       at 
> org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233)
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> 23/08/10 14:31:12 WARN AutoRecordKeyGenerationUtils$: Precombine field ts 
> will be ignored with auto record key generation enabled
> 23/08/10 14:31:12 WARN HoodieWriteConfig: Embedded timeline server is 
> disabled, fallback to use direct marker type for spark
> 23/08/10 14:31:12 ERROR HoodieStreamingSink: Micro batch id=0 threw following 
> exception: 
> org.apache.spark.sql.AnalysisException: Queries with streaming sources must 
> be executed with writeStream.start();
> LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, 
> _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, 
> begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, 
> fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107)
>       at 
> org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132)
>       at scala.util.Try$.apply(Try.scala:213)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129)
>       at 
> org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233)
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> 23/08/10 14:31:16 WARN AutoRecordKeyGenerationUtils$: Precombine field ts 
> will be ignored with auto record key generation enabled
> 23/08/10 14:31:16 WARN HoodieWriteConfig: Embedded timeline server is 
> disabled, fallback to use direct marker type for spark
> 23/08/10 14:31:16 ERROR HoodieStreamingSink: Micro batch id=0 threw following 
> exception: 
> org.apache.spark.sql.AnalysisException: Queries with streaming sources must 
> be executed with writeStream.start();
> LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, 
> _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, 
> begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, 
> fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107)
>       at 
> org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132)
>       at scala.util.Try$.apply(Try.scala:213)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129)
>       at 
> org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233)
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> 23/08/10 14:31:16 ERROR HoodieStreamingSink: Micro batch id=0 threw following 
> expections,aborting streaming app to avoid data loss: 
> org.apache.spark.sql.AnalysisException: Queries with streaming sources must 
> be executed with writeStream.start();
> LogicalRDD [_hoodie_commit_time#1063, _hoodie_commit_seqno#1064, 
> _hoodie_record_key#1065, _hoodie_partition_path#1066, _hoodie_file_name#1067, 
> begin_lat#1068, begin_lon#1069, driver#1070, end_lat#1071, end_lon#1072, 
> fare#1073, partitionpath#1074, rider#1075, ts#1076L, uuid#1077], true
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:447)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:262)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:262)
>       at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:109)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:107)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:125)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:121)
>       at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:117)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:135)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:153)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:150)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:172)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:171)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:107)
>       at 
> org.apache.hudi.HoodieCreateRecordUtils$.createHoodieRecordRdd(HoodieCreateRecordUtils.scala:114)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:386)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:132)
>       at scala.util.Try$.apply(Try.scala:213)
>       at 
> org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:129)
>       at 
> org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:233)
>       at 
> org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:128)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to