Tandoy opened a new issue #3598:
URL: https://github.com/apache/hudi/issues/3598
**Describe the problem you faced**
HoodieDeltaStreamerException when using OCC and a second concurrent writer
that after a few commits from each the deltastreamer is likely to fail with
the above exception when the datasource writer creates non-isolated inflight
commits
**To Reproduce**
Steps to reproduce the behavior:
1.Start a deltastreamer job against some table Foo
```
spark-submit \
--master yarn \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--deploy-mode cluster \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.yarn.driver.memoryOverhead=512 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls
/home/appuser/tangzhi/hudi-0.8/hudi-release-0.8.0/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0.jar`
\
--props
file:///opt/apps/hudi/hudi-utilities/src/test/resources/delta-streamer-config/kafka.properties
\
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path
hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ \
--op UPSERT \
--continuous \
--target-table hudi_test_occ \
--table-type COPY_ON_WRITE \
--source-ordering-field uid \
--source-limit 5000000
```
2.In parallel, start writing to the same table Foo using spark datasource
writer
```
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_test_occ"
val basePath = "hdfs://dxbigdata101:8020/user/hudi/test/data/hudi_test_occ"
val inserts =
Seq("""{"area":"hunan","uid":"1","itemid":"11","npgid":"43","evid":"addComment","os":"andriod","pgid":"30","appid":"gmall2019","mid":"mid_117","type":"event","ts":"2021-08-14
12:23:34"}""")
import spark.implicits._
val ds = spark.createDataset(inserts)
val df = spark.read.json(ds)
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option("hoodie.write.meta.key.prefixes", "deltastreamer.checkpoint.key")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "dxbigdata103")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "occ")
.option("hoodie.write.lock.zookeeper.base_path", "/hudi/occ")
.option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.TimestampBasedKeyGenerator")
.option("hoodie.deltastreamer.keygen.timebased.timestamp.type",
"DATE_STRING")
.option("hoodie.deltastreamer.keygen.timebased.input.dateformat",
"yyyy-MM-dd HH:mm:ss")
.option("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyy/MM/dd")
.option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00")
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "ts")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)
```
**Environment Description**
Hudi version : 0.8
Spark version : 2.4.0.cloudera2
Hadoop version : 2.6.0-cdh5.13.3
Hive version : 1.1.0-cdh5.13.3
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) :no
**Stacktrace**
```
Caused by: org.apache.hudi.exception.HoodieException: Unable to find
previous checkpoint. Please double check if this table was indeed built via
delta streamer. Last Commit :Option{val=[20210903233550__commit__COMPLETED]},
Instants :[[20210903233550__commit__COMPLETED]], CommitMetadata={
"partitionToWriteStats" : {
"2021/08/14" : [ {
"fileId" : "4cfbfc21-273d-4b79-b8c0-11269b3b3112-0",
"path" :
"2021/08/14/4cfbfc21-273d-4b79-b8c0-11269b3b3112-0_0-22-22_20210903233550.parquet",
"prevCommit" : "null",
"numWrites" : 1,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 1,
"totalWriteBytes" : 436383,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "2021/08/14",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 436383,
"minEventTime" : null,
"maxEventTime" : null
} ]
},```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]