codope commented on issue #8236:
URL: https://github.com/apache/hudi/issues/8236#issuecomment-1527119084

   I could not reproduce with the master branch. My script
   ```
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
   import org.apache.spark.sql.types.{IntegerType, StringType, LongType, 
StructType}
   import java.time.LocalDateTime
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.config.HoodieCompactionConfig
   import org.apache.spark.sql.streaming.OutputMode
   
   val dataStreamReader = spark.
         readStream.
         format("kafka").
         option("kafka.bootstrap.servers", "localhost:9092").
         option("subscribe", "impressions").
         option("startingOffsets", "earliest"). // also tried with "latest"
         option("maxOffsetsPerTrigger", 2000). // also tried with 1000, 5000
         option("failOnDataLoss", false)
   
    val schema = new StructType().
         add("impresssiontime",LongType).
         add("impressionid",StringType).
         add("userid",StringType).
         add("adid",StringType)
   
    val df = dataStreamReader.load().
    selectExpr(
           "topic as kafka_topic",
           "CAST(partition AS STRING) kafka_partition",
           "cast(timestamp as String) kafka_timestamp",
           "CAST(offset AS STRING) kafka_offset",
           "CAST(key AS STRING) kafka_key",
           "CAST(value AS STRING) kafka_value",
           "current_timestamp() current_time").
           selectExpr(
           "kafka_topic",
           "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
           "kafka_offset",
           "kafka_timestamp",
           "kafka_key",
           "kafka_value",
           "substr(current_time,1,10) 
partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"),
 
schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid",
 "data.userid","data.adid","partition_date")
   
   
   val writer = df.
       writeStream.format("org.apache.hudi").
         option(TABLE_TYPE.key, "MERGE_ON_READ").
         option(TBL_NAME.key, "mor_table").
         option(PRECOMBINE_FIELD.key, "impresssiontime").
         option(RECORDKEY_FIELD.key, "impressionid").
         option(PARTITIONPATH_FIELD.key, "userid").
         option(HIVE_SYNC_ENABLED.key, true).
         option(HIVE_STYLE_PARTITIONING.key, true).
         option(HoodieCompactionConfig.INLINE_COMPACT.key, true).
         option(STREAMING_RETRY_CNT.key, 0).
         option(OPERATION.key, UPSERT_OPERATION_OPT_VAL).
         option("hoodie.datasource.hive_sync.database", "default").
         option("hoodie.datasource.hive_sync.table", "mor_table").
         option("hoodie.datasource.hive_sync.username", "hive").
         option("hoodie.datasource.hive_sync.password","hive").
         option("hoodie.datasource.hive_sync.use_jdbc",true).
         
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://hiveserver:10000").
         option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
         option("path", "/tmp/hudi_streaming_kafka/mor_table").
         outputMode(OutputMode.Append()).
         start()
   
   writer.awaitTermination()
   ```
   This does not seem related to spark streaming. There was a bug in the 
implementation of `HoodieMergeOnReadTableInputFormat` which is fixed by 
https://github.com/apache/hudi/commit/fe43e6f85d6d98db43b4fdc144654a07db032a28
   
   You can upgrade to the latest release of Hudi.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to