parisni opened a new issue #3731:
URL: https://github.com/apache/hudi/issues/3731


   hudi 0.9.0, spark3.1
   
   To experiment with OCC I setup this local tools:
   - local hive metastore
   - pyspark script
   - run concurrently with xargs
   
   Sometimes it works as expected (mostrly with 2 concurrent process). But with 
4 process I get randomly one of those stacktrace:
   
   Type 1 error:
   
   ```
    : org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, 
lock object LockResponse(lockid:255, state:WAITING)
    at 
org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:82)
    at 
org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:64)
   
   ```
   
   Type 2 error:
   
   ```
    : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for 
commit time 20210921153357
    at 
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
    at 
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
    Caused by: java.lang.IllegalArgumentException
   ```
   
   Type 3 error:
   
   ```
    /tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
    at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:544)
    at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:505)
    Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
exists: 
file:/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
   ``` 
   
   
   
   Reproduce step:
   
   Python script:
   
   ```python
   ## The idea is to generate a random partition
   ## They are run with a little delay in order to understand why I got the 
error onthe same commit timestamp
   ## but this is not actually needed
   ## There should be a COUNT=(NB+1) * 10 , where NB is the number of 
concurrent spark jobs
   
   from pyspark.sql import SparkSession
   
   import pyspark
   from numpy import random
   from time import sleep
   
   sleeptime = random.uniform(2, 5)
   print("sleeping for:", sleeptime, "seconds")
   sleep(sleeptime)
   conf = pyspark.SparkConf()
   spark_conf = [
       (
           "spark.jars.packages",
           
"org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.1.2",
       ),
       ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
       ("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083"),
       ("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive"),
       ("spark.hadoop.javax.jdo.option.ConnectionPassword", "hive"),
       ("spark.hadoop.hive.server2.thrift.url", "jdbc:hive2://localhost:10000"),
   ]
   conf.setAll(spark_conf)
   spark = (
       SparkSession.builder.appName("test-hudi-hive-sync")
       .config(conf=conf)
       .enableHiveSupport()
       .getOrCreate()
   )
   sc = spark.sparkContext
   
   # Create a table
   sc.setLogLevel("ERROR")
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
       dataGen.generateInserts(10)
   )
   from pyspark.sql.functions import expr
   
   df = (
       spark.read.json(spark.sparkContext.parallelize(inserts, 10))
       .withColumn("part", expr(f"'foo{sleeptime}'"))
    # One partition per run !!
       .withColumn("id", expr("row_number() over(partition by 1 order by 1)"))
   )
   
   
   databaseName = "default"
   tableName = "test_hudi_pyspark_local"
   basePath = f"/tmp/{tableName}"
   
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.upsert.shuffle.parallelism": 2,
       "hoodie.insert.shuffle.parallelism": 2,
       # For hive sync metastore
       "hoodie.datasource.hive_sync.database": databaseName,
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.mode": "jdbc",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       # For concurrency write locks with hive metastore
       "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
       "hoodie.cleaner.policy.failed.writes": "LAZY",
       "hoodie.write.lock.provider": 
"org.apache.hudi.hive.HiveMetastoreBasedLockProvider",
       "hoodie.write.lock.hivemetastore.database": databaseName,
       "hoodie.write.lock.hivemetastore.table": tableName,
       "hoodie.write.lock.wait_time_ms": "12000",
       "hoodie.write.lock.num_retries": "4",
       "hoodie.embed.timeline.server": "false",
       "hoodie.datasource.write.commitmeta.key.prefix": 
"deltastreamer.checkpoint.key",
   }
   
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   print(
       "@@@@@@@@@@@@@@@@ COUNT={} @@@@@@@@@@@@@@@@@@".format(
           spark.read.format("hudi").load(basePath).count()
       )
   )
   ```
   
   Bash script:
   ```
   #!/usr/bin/env bash
   NB=$1
   rm -rf /tmp/test_hudi_pyspark_local/
   python3 concurrent.py
   seq 1 $NB  | xargs -n 1 -P $NB python3 concurrent.py
   
   ```
   
   Run it:
   ```
   ./conccurrent.sh 4
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to