[
https://issues.apache.org/jira/browse/HUDI-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382691#comment-17382691
]
Nishith Agarwal commented on HUDI-2146:
---------------------------------------
[~wenningd] I see that there is a conflict thrown when both inserts are started
simultaneously
insert 1
{code:java}
scala> df3.write.format("org.apache.hudi").option(HoodieWriteConfig.TABLE_NAME,
tableName).option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
"event_id").option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"event_type").option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"event_ts").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,
"true").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,
tableName).option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"event_type").option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY,
"false").option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control").option("hoodie.cleaner.policy.failed.writes",
"LAZY").option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider").option("hoodie.write.lock.filesystem.path",
"/tmp/").option("hoodie.write.lock.hivemetastore.database",
"test_db").option("hoodie.write.lock.hivemetastore.table",
"hudi_test").option("hoodie.write.lock.hivemetastore.uris",
"").mode(SaveMode.Append).save(tablePath)
21/07/18 01:38:55 WARN hudi.DataSourceWriteOptions$:
hoodie.datasource.write.storage.type is deprecated and will be removed in a
later release; Please use hoodie.datasource.write.table.type
org.apache.hudi.exception.HoodieWriteConflictException:
java.util.ConcurrentModificationException: Cannot resolve conflicts for
overlapping writes
at
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
at
org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:68)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at
org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:62)
at
org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:456)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:183)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:564)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:230)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
... 49 elided
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts
for overlapping writes
... 82 more
{code}
insert 2
{code:java}
scala> df3.write.format("org.apache.hudi").option(HoodieWriteConfig.TABLE_NAME,
tableName).option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
"event_id").option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"event_type").option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"event_ts").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,
"true").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,
tableName).option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"event_type").option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY,
"false").option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.write.concurrency.mode",
"optimistic_concurrency_control").option("hoodie.cleaner.policy.failed.writes",
"LAZY").option("hoodie.write.lock.provider",
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider").option("hoodie.write.lock.filesystem.path",
"/tmp/").option("hoodie.write.lock.hivemetastore.database",
"test_db").option("hoodie.write.lock.hivemetastore.table",
"hudi_test").option("hoodie.write.lock.hivemetastore.uris",
"").mode(SaveMode.Append).save(tablePath)
21/07/18 01:38:53 WARN hudi.DataSourceWriteOptions$:
hoodie.datasource.write.storage.type is deprecated and will be removed in a
later release; Please use hoodie.datasource.write.table.type
scala>
{code}
Only 2 commits are present (first to create the table and another insert)
{code:java}
-rw-r--r-- 1 root supergroup 2701 2021-07-18 01:38
/tmp/hudi_test/.hoodie/20210718013826.commit
-rw-r--r-- 1 root supergroup 0 2021-07-18 01:38
/tmp/hudi_test/.hoodie/20210718013826.commit.requested
-rw-r--r-- 1 root supergroup 1822 2021-07-18 01:38
/tmp/hudi_test/.hoodie/20210718013826.inflight
-rw-r--r-- 1 root supergroup 2722 2021-07-18 01:40
/tmp/hudi_test/.hoodie/20210718013853.commit
-rw-r--r-- 1 root supergroup 0 2021-07-18 01:38
/tmp/hudi_test/.hoodie/20210718013853.commit.requested
-rw-r--r-- 1 root supergroup 1822 2021-07-18 01:39
/tmp/hudi_test/.hoodie/20210718013853.inflight
-rw-r--r-- 1 root supergroup 0 2021-07-18 01:38
/tmp/hudi_test/.hoodie/20210718013855.commit.requested
-rw-r--r-- 1 root supergroup 1822 2021-07-18 01:39
/tmp/hudi_test/.hoodie/20210718013855.inflight
{code}
I'm unable to reproduce the above scenario. Can you please try again and see if
you are able to reproduce it consistently ? Also, I'm using master for my
testing.
> Concurrent writes loss data
> ----------------------------
>
> Key: HUDI-2146
> URL: https://issues.apache.org/jira/browse/HUDI-2146
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Wenning Ding
> Priority: Blocker
> Fix For: 0.9.0
>
> Attachments: image-2021-07-08-00-49-30-730.png
>
>
> Reproduction steps:
> Create a Hudi table:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> import org.apache.hudi.AvroConversionUtils
> val df = Seq(
> (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
> (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
> (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
> (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
> ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_test"
> var tablePath = "s3://.../" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Overwrite)
> .save(tablePath)
> {code}
> Perform two insert operations almost in the same time, each insertion
> contains different data:
> Insert 1:
> {code:java}
> val df3 = Seq(
> (400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"),
> (401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"),
> (404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"),
> (405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2")
> ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
> .option("hoodie.cleaner.policy.failed.writes", "LAZY")
> .option("hoodie.write.lock.provider",
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
> .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
> .option("hoodie.write.lock.zookeeper.port", "2181")
> .option("hoodie.write.lock.zookeeper.lock_key", tableName)
> .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
> .mode(SaveMode.Append)
> .save(tablePath)
> {code}
> Insert 2:
> {code:java}
> val df3 = Seq(
> (300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"),
> (301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"),
> (304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"),
> (305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2")
> ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
> .option("hoodie.cleaner.policy.failed.writes", "LAZY")
> .option("hoodie.write.lock.provider",
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
> .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
> .option("hoodie.write.lock.zookeeper.port", "2181")
> .option("hoodie.write.lock.zookeeper.lock_key", tableName)
> .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
> .mode(SaveMode.Append)
> .save(tablePath)
> {code}
> There's no exception/rollback during the insertions, however, when I check
> the Hudi table data, it only contains one insert and the other one is missing.
> Here is the timeline:
> !image-2021-07-08-00-49-30-730.png|width=840,height=322!
> I checked the parquet file at 20210706171250, it contains all the insertion
> data. However, in the parquet file of 20210706171252, it only contains one
> insert data. Also you can see, though 20210706171252 is the latest timestamp,
> the commit happens before 20210706171250.
> So I guess here is the process:
> insert 1 get timestamp (20210706171250) -> insert 2 get timestamp
> (20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit
> insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi
> table (20210706171250)
> However, when querying Hudi table, it would return the data from the latest
> timestamp which is 20210706171252, so it would only return the data from
> insert 2.
> We need to check/update the timestamp somewhere before committing.
>
>
>
>
> the
--
This message was sent by Atlassian Jira
(v8.3.4#803005)