li36909 commented on pull request #2751:
URL: https://github.com/apache/hudi/pull/2751#issuecomment-812271790


   @nsivabalan I run the test at hudi 0.7. yes, you are right, I start a 
spark-shell for upserting, and query the same table by spark datasouce api, 
then the problem arises. The cause of the problem is clear, during the query, 
hudi get partitions at MergeOnReadSnapshotRelation, and build a new fsview at 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile, when a write operation is 
happening,  HoodieRealtimeInputFormatUtils.groupLogsByBaseFile will find some 
new base files.
   we can reproduce this issue by add a Thread.sleep(60000) at 
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile, then run this test:
   step 1: write first batch into a hudi table:
   spark-shell
   
   import org.apache.hudi.QuickstartUtils.
   import scala.collection.JavaConversions.
   import org.apache.spark.sql.SaveMode.
   import org.apache.hudi.DataSourceReadOptions.
   import org.apache.hudi.DataSourceWriteOptions.
   import org.apache.hudi.config.HoodieWriteConfig.
   
   
   val tableName = "hudi_mor_table"
   val basePath = "hdfs://hacluster/tmp/hudi_mor_table"
   val dataGen = new DataGenerator
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.write.format("org.apache.hudi").
   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
   option("hoodie.datasource.write.operation", "bulk_insert").
   options(getQuickstartWriteConfigs).
   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
   option("hoodie.datasource.write.hive_style_partitioning", "false").
   option(TABLE_NAME, tableName).
   option("hoodie.datasource.hive_sync.enable", "true").
   option("hoodie.datasource.hive_sync.use_jdbc", "false").
   option("hoodie.datasource.hive_sync.table", "hudi_mor_test").
   option("hoodie.datasource.hive_sync.partition_extractor_class", 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
   option("hoodie.datasource.hive_sync.partition_fields", 
"continent,country,city").
   option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
   option("hoodie.insert.shuffle.parallelism", "2").
   option("hoodie.upsert.shuffle.parallelism","2").
   option("hoodie.bulkinsert.shuffle.parallelism", "2").
   option("hoodie.delete.shuffle.parallelism","2").
   mode(Append).
   save(basePath);
   
   step 2: run a query at new spark-shell (when the query hang at Thread.sleep, 
start to write a new batch at step3)
   
spark.read.format("hudi").load("hdfs://hacluster/tmp/hudi_mor_table/*/*/*/*").count
   
   setp 3:  go to the spark-shell at step1, write a new batch:
   df.write.format("org.apache.hudi").
   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
   option("hoodie.datasource.write.operation", "bulk_insert").
   options(getQuickstartWriteConfigs).
   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
   option("hoodie.datasource.write.hive_style_partitioning", "false").
   option(TABLE_NAME, tableName).
   option("hoodie.datasource.hive_sync.enable", "true").
   option("hoodie.datasource.hive_sync.use_jdbc", "false").
   option("hoodie.datasource.hive_sync.table", "hudi_mor_test").
   option("hoodie.datasource.hive_sync.partition_extractor_class", 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
   option("hoodie.datasource.hive_sync.partition_fields", 
"continent,country,city").
   option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
   option("hoodie.insert.shuffle.parallelism", "2").
   option("hoodie.upsert.shuffle.parallelism","2").
   option("hoodie.bulkinsert.shuffle.parallelism", "2").
   option("hoodie.delete.shuffle.parallelism","2").
   mode(Append).
   save(basePath);
   
   we can see the step2 will throw a exception


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to