sunke38 opened a new issue, #5550:
URL: https://github.com/apache/hudi/issues/5550
SparkHudi.scala
I try to use spark-hudi to read data from kafka then write to hudi. but my
code fail to submit to spark. It show
org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters;
is abstract
what is possible reason for this log? Do I miss some depandency?
`
object SparkHudi {
val logger = Logger.getLogger(SparkHudi.getClass)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("SparkHudi")
//.master("local[*]")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", 9)
.config("spark.sql.shuffle.partitions", 9)
.enableHiveSupport()
.getOrCreate()
// 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent):
Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit
= {
println("Query made progress: " + queryProgress.progress)
}
})
// 定义kafka流
val dataStreamReader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.10.9.202:9092")
.option("subscribe", "spark-test")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100000)
.option("failOnDataLoss", false)
//
加载流数据,这里由于只是测试使用,直接读取kafka消息而不作其余处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。
val df = dataStreamReader.load()
.selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time",
)
.selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date")
// 建立并启动query
val query = df
.writeStream
.queryName("demo")
.foreachBatch ( (batchDF: DataFrame, _: Long) => {
batchDF.persist()
println(LocalDateTime.now() + "start writing cow table")
batchDF.write.format("org.apache.hudi")
.option(TABLE_TYPE.key(), "COPY_ON_WRITE")
.option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
// 以kafka分区和偏移量做为组合主键
.option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
// 以当前日期做为分区
.option(PARTITIONPATH_FIELD.key(), "partition_date")
.option("hoodie.table.name", "copy_on_write_table")
.option(HIVE_STYLE_PARTITIONING.key(), true)
.mode(SaveMode.Append)
.save("/tmp/sparkHudi/COPY_ON_WRITE")
println(LocalDateTime.now() + "start writing mor table")
batchDF.write.format("org.apache.hudi")
.option(TABLE_TYPE.key(), "MERGE_ON_READ")
//.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(PRECOMBINE_FIELD.key(), "kafka_timestamp")
.option(RECORDKEY_FIELD.key(), "kafka_partition_offset")
.option(PARTITIONPATH_FIELD.key(), "partition_date")
.option("hoodie.table.name", "merge_on_read_table")
.option(HIVE_STYLE_PARTITIONING.key(), true)
.mode(SaveMode.Append)
.save("/tmp/sparkHudi/MERGE_ON_READ")
println(LocalDateTime.now() + "finish")
batchDF.unpersist()
return unbox(Unit)
})
.option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")
.start()
query.awaitTermination()
}
}`
Environment Description
Hudi version : 0.11
Spark version : 3.2.1
Hadoop version : 3.2.2
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
**Stacktrace**
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [kafka_topic#21, concat(kafka_partition#22, -, kafka_offset#24) AS
kafka_partition_offset#35, kafka_offset#24, kafka_timestamp#23, kafka_key#25,
kafka_value#26, substr(cast(current_time#27 as string), 1, 10) AS
partition_date#36]
+- Project [topic#9 AS kafka_topic#21, cast(partition#10 as string) AS
kafka_partition#22, cast(timestamp#12 as string) AS kafka_timestamp#23,
cast(offset#11L as string) AS kafka_offset#24, cast(key#7 as string) AS
kafka_key#25, cast(value#8 as string) AS kafka_value#26, current_timestamp() AS
current_time#27]
+- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10,
offset#11L, timestamp#12, timestampType#13],
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1e23578c,
KafkaV2[Subscribe[spark-test]]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: java.lang.AbstractMethodError: Method
org/apache/spark/sql/adapter/Spark3_2Adapter.getAvroSchemaConverters()Lorg/apache/spark/sql/avro/HoodieAvroSchemaConverters;
is abstract
at
org.apache.spark.sql.adapter.Spark3_2Adapter.getAvroSchemaConverters(Spark3_2Adapter.scala)
at
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:150)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:241)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at com.example.SparkHudi$.$anonfun$main$1(SparkHudi.scala:88)
at com.example.SparkHudi$.$anonfun$main$1$adapted(SparkHudi.scala:74)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
... 1 more
2022-05-10 11:01:47,576 INFO spark.SparkContext: Invoking stop() from
shutdown hook
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]