sunke38 opened a new issue, #5550:
URL: https://github.com/apache/hudi/issues/5550

   SparkHudi.scala
   
   I try to use spark-hudi to read data from kafka then write to hudi. but my 
code fail to submit to spark. It show 
   
   
org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters;
 is abstract
   
   what is possible reason for this log? Do I miss some depandency?
   `
   
   object SparkHudi {
     val logger = Logger.getLogger(SparkHudi.getClass)
   
     def main(args: Array[String]): Unit = {
   
       val spark = SparkSession
         .builder
         .appName("SparkHudi")
         //.master("local[*]")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .config("spark.default.parallelism", 9)
         .config("spark.sql.shuffle.partitions", 9)
         .enableHiveSupport()
         .getOrCreate()
   
       // 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台
       spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
           println("Query started: " + queryStarted.id)
         }
         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
           println("Query terminated: " + queryTerminated.id)
         }
         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit 
= {
           println("Query made progress: " + queryProgress.progress)
         }
       })
   
       // 定义kafka流
       val dataStreamReader = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "10.10.9.202:9092")
         .option("subscribe", "spark-test")
         .option("startingOffsets", "latest")
         .option("maxOffsetsPerTrigger", 100000)
         .option("failOnDataLoss", false)
   
       // 
加载流数据,这里由于只是测试使用,直接读取kafka消息而不作其余处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。
       val df = dataStreamReader.load()
         .selectExpr(
           "topic as kafka_topic",
       "CAST(partition AS STRING) kafka_partition",
       "cast(timestamp as String) kafka_timestamp",
       "CAST(offset AS STRING) kafka_offset",
       "CAST(key AS STRING) kafka_key",
       "CAST(value AS STRING) kafka_value",
       "current_timestamp() current_time",
       )
       .selectExpr(
         "kafka_topic",
       "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
       "kafka_offset",
       "kafka_timestamp",
       "kafka_key",
       "kafka_value",
       "substr(current_time,1,10) partition_date")
   
       // 建立并启动query
       val query = df
         .writeStream
         .queryName("demo")
         .foreachBatch ( (batchDF: DataFrame, _: Long) => {
           batchDF.persist()
   
           println(LocalDateTime.now() + "start writing cow table")
           batchDF.write.format("org.apache.hudi")
             .option(TABLE_TYPE.key(), "COPY_ON_WRITE")
             .option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
             // 以kafka分区和偏移量做为组合主键
             .option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
             // 以当前日期做为分区
             .option(PARTITIONPATH_FIELD.key(), "partition_date")
             .option("hoodie.table.name", "copy_on_write_table")
             .option(HIVE_STYLE_PARTITIONING.key(), true)
             .mode(SaveMode.Append)
             .save("/tmp/sparkHudi/COPY_ON_WRITE")
   
           println(LocalDateTime.now() + "start writing mor table")
           batchDF.write.format("org.apache.hudi")
             .option(TABLE_TYPE.key(), "MERGE_ON_READ")
             //.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
             .option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
             .option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
             .option(PARTITIONPATH_FIELD.key(), "partition_date")
             .option("hoodie.table.name", "merge_on_read_table")
             .option(HIVE_STYLE_PARTITIONING.key(), true)
             .mode(SaveMode.Append)
             .save("/tmp/sparkHudi/MERGE_ON_READ")
   
           println(LocalDateTime.now() + "finish")
           batchDF.unpersist()
   
           return unbox(Unit)
         })
         .option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")
         .start()
   
       query.awaitTermination()
     }
   }`
   
   Environment Description
   
   Hudi version : 0.11
   
   Spark version : 3.2.1
   
   Hadoop version : 3.2.2
   
   Storage (HDFS/S3/GCS..) : HDFS
   
   Running on Docker? (yes/no) : no
   
   
   **Stacktrace**
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   Project [kafka_topic#21, concat(kafka_partition#22, -, kafka_offset#24) AS 
kafka_partition_offset#35, kafka_offset#24, kafka_timestamp#23, kafka_key#25, 
kafka_value#26, substr(cast(current_time#27 as string), 1, 10) AS 
partition_date#36]
   +- Project [topic#9 AS kafka_topic#21, cast(partition#10 as string) AS 
kafka_partition#22, cast(timestamp#12 as string) AS kafka_timestamp#23, 
cast(offset#11L as string) AS kafka_offset#24, cast(key#7 as string) AS 
kafka_key#25, cast(value#8 as string) AS kafka_value#26, current_timestamp() AS 
current_time#27]
      +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, 
offset#11L, timestamp#12, timestampType#13], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1e23578c, 
KafkaV2[Subscribe[spark-test]]
   
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
   Caused by: java.lang.AbstractMethodError: Method 
org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters;
 is abstract
        at 
org.apache.spark.sql.adapter.Spark3_2Adapter.getAvroSchemaConverters(Spark3_2Adapter.scala)
        at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:150)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:241)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at com.example.SparkHudi$.$anonfun$main$1(SparkHudi.scala:88)
        at com.example.SparkHudi$.$anonfun$main$1$adapted(SparkHudi.scala:74)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
        ... 1 more
   2022-05-10 11:01:47,576 INFO spark.SparkContext: Invoking stop() from 
shutdown hook
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to