parisni opened a new issue #3731:
URL: https://github.com/apache/hudi/issues/3731
hudi 0.9.0, spark3.1
To experiment with OCC I setup this local tools:
- local hive metastore
- pyspark script
- run concurrently with xargs
Sometimes it works as expected (mostrly with 2 concurrent process). But with
4 process I get randomly one of those stacktrace:
Type 1 error:
```
: org.apache.hudi.exception.HoodieLockException: Unable to acquire lock,
lock object LockResponse(lockid:255, state:WAITING)
at
org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:82)
at
org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:64)
```
Type 2 error:
```
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20210921153357
at
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
Caused by: java.lang.IllegalArgumentException
```
Type 3 error:
```
/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:544)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:505)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already
exists:
file:/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
```
Reproduce step:
Python script:
```python
## The idea is to generate a random partition
## They are run with a little delay in order to understand why I got the
error onthe same commit timestamp
## but this is not actually needed
## There should be a COUNT=(NB+1) * 10 , where NB is the number of
concurrent spark jobs
from pyspark.sql import SparkSession
import pyspark
from numpy import random
from time import sleep
sleeptime = random.uniform(2, 5)
print("sleeping for:", sleeptime, "seconds")
sleep(sleeptime)
conf = pyspark.SparkConf()
spark_conf = [
(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.1.2",
),
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083"),
("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive"),
("spark.hadoop.javax.jdo.option.ConnectionPassword", "hive"),
("spark.hadoop.hive.server2.thrift.url", "jdbc:hive2://localhost:10000"),
]
conf.setAll(spark_conf)
spark = (
SparkSession.builder.appName("test-hudi-hive-sync")
.config(conf=conf)
.enableHiveSupport()
.getOrCreate()
)
sc = spark.sparkContext
# Create a table
sc.setLogLevel("ERROR")
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
dataGen.generateInserts(10)
)
from pyspark.sql.functions import expr
df = (
spark.read.json(spark.sparkContext.parallelize(inserts, 10))
.withColumn("part", expr(f"'foo{sleeptime}'"))
# One partition per run !!
.withColumn("id", expr("row_number() over(partition by 1 order by 1)"))
)
databaseName = "default"
tableName = "test_hudi_pyspark_local"
basePath = f"/tmp/{tableName}"
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
# For hive sync metastore
"hoodie.datasource.hive_sync.database": databaseName,
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "jdbc",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "part",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
# For concurrency write locks with hive metastore
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider":
"org.apache.hudi.hive.HiveMetastoreBasedLockProvider",
"hoodie.write.lock.hivemetastore.database": databaseName,
"hoodie.write.lock.hivemetastore.table": tableName,
"hoodie.write.lock.wait_time_ms": "12000",
"hoodie.write.lock.num_retries": "4",
"hoodie.embed.timeline.server": "false",
"hoodie.datasource.write.commitmeta.key.prefix":
"deltastreamer.checkpoint.key",
}
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
print(
"@@@@@@@@@@@@@@@@ COUNT={} @@@@@@@@@@@@@@@@@@".format(
spark.read.format("hudi").load(basePath).count()
)
)
```
Bash script:
```
#!/usr/bin/env bash
NB=$1
rm -rf /tmp/test_hudi_pyspark_local/
python3 concurrent.py
seq 1 $NB | xargs -n 1 -P $NB python3 concurrent.py
```
Run it:
```
./conccurrent.sh 4
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]