codope commented on issue #8236:
URL: https://github.com/apache/hudi/issues/8236#issuecomment-1527119084
I could not reproduce with the master branch. My script
```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.types.{IntegerType, StringType, LongType,
StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.spark.sql.streaming.OutputMode
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest"). // also tried with "latest"
option("maxOffsetsPerTrigger", 2000). // also tried with 1000, 5000
option("failOnDataLoss", false)
val schema = new StructType().
add("impresssiontime",LongType).
add("impressionid",StringType).
add("userid",StringType).
add("adid",StringType)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10)
partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"),
schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid",
"data.userid","data.adid","partition_date")
val writer = df.
writeStream.format("org.apache.hudi").
option(TABLE_TYPE.key, "MERGE_ON_READ").
option(TBL_NAME.key, "mor_table").
option(PRECOMBINE_FIELD.key, "impresssiontime").
option(RECORDKEY_FIELD.key, "impressionid").
option(PARTITIONPATH_FIELD.key, "userid").
option(HIVE_SYNC_ENABLED.key, true).
option(HIVE_STYLE_PARTITIONING.key, true).
option(HoodieCompactionConfig.INLINE_COMPACT.key, true).
option(STREAMING_RETRY_CNT.key, 0).
option(OPERATION.key, UPSERT_OPERATION_OPT_VAL).
option("hoodie.datasource.hive_sync.database", "default").
option("hoodie.datasource.hive_sync.table", "mor_table").
option("hoodie.datasource.hive_sync.username", "hive").
option("hoodie.datasource.hive_sync.password","hive").
option("hoodie.datasource.hive_sync.use_jdbc",true).
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://hiveserver:10000").
option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
option("path", "/tmp/hudi_streaming_kafka/mor_table").
outputMode(OutputMode.Append()).
start()
writer.awaitTermination()
```
This does not seem related to spark streaming. There was a bug in the
implementation of `HoodieMergeOnReadTableInputFormat` which is fixed by
https://github.com/apache/hudi/commit/fe43e6f85d6d98db43b4fdc144654a07db032a28
You can upgrade to the latest release of Hudi.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]