[ 
https://issues.apache.org/jira/browse/HUDI-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382691#comment-17382691
 ] 

Nishith Agarwal commented on HUDI-2146:
---------------------------------------

[~wenningd] I see that there is a conflict thrown when both inserts are started 
simultaneously 

insert 1
{code:java}
scala> df3.write.format("org.apache.hudi").option(HoodieWriteConfig.TABLE_NAME, 
tableName).option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
 "event_id").option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"event_type").option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"event_ts").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, 
"true").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, 
tableName).option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"event_type").option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, 
"false").option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.write.concurrency.mode",
 
"optimistic_concurrency_control").option("hoodie.cleaner.policy.failed.writes", 
"LAZY").option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider").option("hoodie.write.lock.filesystem.path",
 "/tmp/").option("hoodie.write.lock.hivemetastore.database", 
"test_db").option("hoodie.write.lock.hivemetastore.table", 
"hudi_test").option("hoodie.write.lock.hivemetastore.uris", 
"").mode(SaveMode.Append).save(tablePath)
21/07/18 01:38:55 WARN hudi.DataSourceWriteOptions$: 
hoodie.datasource.write.storage.type is deprecated and will be removed in a 
later release; Please use hoodie.datasource.write.table.type
org.apache.hudi.exception.HoodieWriteConflictException: 
java.util.ConcurrentModificationException: Cannot resolve conflicts for 
overlapping writes
  at 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
  at 
org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:68)
  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
  at 
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
  at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
  at 
org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:62)
  at 
org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:456)
  at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:183)
  at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
  at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:564)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:230)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
  at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  ... 49 elided
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts 
for overlapping writes
  ... 82 more
{code}
insert 2
{code:java}
scala> df3.write.format("org.apache.hudi").option(HoodieWriteConfig.TABLE_NAME, 
tableName).option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
 
DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
 "event_id").option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"event_type").option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"event_ts").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, 
"true").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, 
tableName).option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"event_type").option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, 
"false").option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.write.concurrency.mode",
 
"optimistic_concurrency_control").option("hoodie.cleaner.policy.failed.writes", 
"LAZY").option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider").option("hoodie.write.lock.filesystem.path",
 "/tmp/").option("hoodie.write.lock.hivemetastore.database", 
"test_db").option("hoodie.write.lock.hivemetastore.table", 
"hudi_test").option("hoodie.write.lock.hivemetastore.uris", 
"").mode(SaveMode.Append).save(tablePath)
21/07/18 01:38:53 WARN hudi.DataSourceWriteOptions$: 
hoodie.datasource.write.storage.type is deprecated and will be removed in a 
later release; Please use hoodie.datasource.write.table.type
scala>
{code}
Only 2 commits are present (first to create the table and another insert)
{code:java}
-rw-r--r--   1 root supergroup       2701 2021-07-18 01:38 
/tmp/hudi_test/.hoodie/20210718013826.commit
-rw-r--r--   1 root supergroup          0 2021-07-18 01:38 
/tmp/hudi_test/.hoodie/20210718013826.commit.requested
-rw-r--r--   1 root supergroup       1822 2021-07-18 01:38 
/tmp/hudi_test/.hoodie/20210718013826.inflight
-rw-r--r--   1 root supergroup       2722 2021-07-18 01:40 
/tmp/hudi_test/.hoodie/20210718013853.commit
-rw-r--r--   1 root supergroup          0 2021-07-18 01:38 
/tmp/hudi_test/.hoodie/20210718013853.commit.requested
-rw-r--r--   1 root supergroup       1822 2021-07-18 01:39 
/tmp/hudi_test/.hoodie/20210718013853.inflight
-rw-r--r--   1 root supergroup          0 2021-07-18 01:38 
/tmp/hudi_test/.hoodie/20210718013855.commit.requested
-rw-r--r--   1 root supergroup       1822 2021-07-18 01:39 
/tmp/hudi_test/.hoodie/20210718013855.inflight
{code}
I'm unable to reproduce the above scenario. Can you please try again and see if 
you are able to reproduce it consistently ? Also, I'm using master for my 
testing. 

> Concurrent writes loss data 
> ----------------------------
>
>                 Key: HUDI-2146
>                 URL: https://issues.apache.org/jira/browse/HUDI-2146
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Priority: Blocker
>             Fix For: 0.9.0
>
>         Attachments: image-2021-07-08-00-49-30-730.png
>
>
> Reproduction steps:
> Create a Hudi table:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> import org.apache.hudi.AvroConversionUtils
> val df = Seq(
>   (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
>   (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_test"
> var tablePath = "s3://.../" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> {code}
> Perform two insert operations almost in the same time, each insertion 
> contains different data:
> Insert 1:
> {code:java}
> val df3 = Seq(
>   (400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"),
>   (401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"),
>   (404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"),
>   (405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>    .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>    .option("hoodie.write.lock.provider", 
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
>    .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
>    .option("hoodie.write.lock.zookeeper.port", "2181")
>    .option("hoodie.write.lock.zookeeper.lock_key", tableName)
>    .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> Insert 2:
> {code:java}
> val df3 = Seq(
>   (300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"),
>   (301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"),
>   (304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"),
>   (305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>    .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>    .option("hoodie.write.lock.provider", 
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
>    .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
>    .option("hoodie.write.lock.zookeeper.port", "2181")
>    .option("hoodie.write.lock.zookeeper.lock_key", tableName)
>    .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> There's no exception/rollback during the insertions, however, when I check 
> the Hudi table data, it only contains one insert and the other one is missing.
> Here is the timeline:
> !image-2021-07-08-00-49-30-730.png|width=840,height=322!
> I checked the parquet file at 20210706171250, it contains all the insertion 
> data. However, in the parquet file of 20210706171252, it only contains one 
> insert data. Also you can see, though 20210706171252 is the latest timestamp, 
> the commit happens before 20210706171250. 
> So I guess here is the process:
> insert 1 get timestamp (20210706171250) -> insert 2 get timestamp 
> (20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit 
> insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi 
> table (20210706171250)
> However, when querying Hudi table, it would return the data from the latest 
> timestamp which is 20210706171252, so it would only return the data from 
> insert 2.
> We need to check/update the timestamp somewhere before committing.
>  
>  
>  
>  
> the 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to