Tandoy opened a new issue #3598:
URL: https://github.com/apache/hudi/issues/3598


   **Describe the problem you faced**
   HoodieDeltaStreamerException when using OCC and a second concurrent writer
   that after a few commits from each the deltastreamer is likely to fail with 
the above exception when the datasource writer creates non-isolated inflight 
commits
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1.Start a deltastreamer job against some table Foo
   ```
   spark-submit \
   --master yarn \
   --driver-memory 1G \
   --num-executors 2 \
   --executor-memory 1G \
   --executor-cores 4 \
   --deploy-mode cluster \
   --conf spark.yarn.executor.memoryOverhead=512 \
   --conf spark.yarn.driver.memoryOverhead=512 \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls 
/home/appuser/tangzhi/hudi-0.8/hudi-release-0.8.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0.jar`
 \
   --props 
file:///opt/apps/hudi/hudi-utilities/src/test/resources/delta-streamer-config/kafka.properties
 \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
   --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
   --target-base-path 
hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ \
   --op UPSERT \
   --continuous \
   --target-table hudi_test_occ \
   --table-type COPY_ON_WRITE \
   --source-ordering-field uid \
   --source-limit 5000000
   ```
   
   2.In parallel, start writing to the same table Foo using spark datasource 
writer
   ```
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   val tableName = "hudi_test_occ"
   val basePath = "hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ"
   val inserts = 
Seq("""{"area":"hunan","uid":"1","itemid":"11","npgid":"43","evid":"addComment","os":"andriod","pgid":"30","appid":"gmall2019","mid":"mid_117","type":"event","ts":"2021-08-14
 12:23:34"}""")
   import spark.implicits._
   val ds = spark.createDataset(inserts)
   val df = spark.read.json(ds)
   df.write.format("hudi")
   .options(getQuickstartWriteConfigs)
   .option("hoodie.write.meta.key.prefixes", "deltastreamer.checkpoint.key")
   .option("hoodie.cleaner.policy.failed.writes", "LAZY")
   .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
   .option("hoodie.write.lock.zookeeper.url", "dxbigdata103")
   .option("hoodie.write.lock.zookeeper.port", "2181")
   .option("hoodie.write.lock.zookeeper.lock_key", "occ")
   .option("hoodie.write.lock.zookeeper.base_path", "/hudi/occ")
   .option("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.TimestampBasedKeyGenerator")
   .option("hoodie.deltastreamer.keygen.timebased.timestamp.type", 
"DATE_STRING")
   .option("hoodie.deltastreamer.keygen.timebased.input.dateformat", 
"yyyy-MM-dd HH:mm:ss")
   .option("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyy/MM/dd")
   .option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00")
   .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
   .option(RECORDKEY_FIELD_OPT_KEY, "uid")
   .option(PARTITIONPATH_FIELD_OPT_KEY, "ts")
   .option(TABLE_NAME, tableName)
   .mode(Overwrite)
   .save(basePath)
   ```
   
   **Environment Description**
   
   Hudi version : 0.8
   Spark version : 2.4.0.cloudera2
   Hadoop version : 2.6.0-cdh5.13.3
   Hive version : 1.1.0-cdh5.13.3
   Storage (HDFS/S3/GCS..) : HDFS
   Running on Docker? (yes/no) :no
   
   **Stacktrace**
   
   ```
   Caused by: org.apache.hudi.exception.HoodieException: Unable to find 
previous checkpoint. Please double check if this table was indeed built via 
delta streamer. Last Commit :Option{val=[20210903233550__commit__COMPLETED]}, 
Instants :[[20210903233550__commit__COMPLETED]], CommitMetadata={
     "partitionToWriteStats" : {
       "2021/08/14" : [ {
         "fileId" : "4cfbfc21-273d-4b79-b8c0-11269b3b3112-0",
         "path" : 
"2021/08/14/4cfbfc21-273d-4b79-b8c0-11269b3b3112-0_0-22-22_20210903233550.parquet",
         "prevCommit" : "null",
         "numWrites" : 1,
         "numDeletes" : 0,
         "numUpdateWrites" : 0,
         "numInserts" : 1,
         "totalWriteBytes" : 436383,
         "totalWriteErrors" : 0,
         "tempPath" : null,
         "partitionPath" : "2021/08/14",
         "totalLogRecords" : 0,
         "totalLogFilesCompacted" : 0,
         "totalLogSizeCompacted" : 0,
         "totalUpdatedRecordsCompacted" : 0,
         "totalLogBlocks" : 0,
         "totalCorruptLogBlock" : 0,
         "totalRollbackBlocks" : 0,
         "fileSizeInBytes" : 436383,
         "minEventTime" : null,
         "maxEventTime" : null
       } ]
     },```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to