ChiehFu opened a new issue #3782:
URL: https://github.com/apache/hudi/issues/3782
Hello,
We are running jobs on AWS EMR to compact tables stored in S3 and
maintaining Athena tables through Hudi Hive sync.
Recently we started exploring Hudi multi writer and we were experiencing
some issues when running concurrent Hudi upsert jobs with Hudi OCC where each
job save data into a distinct partition.
The errors seem happen pretty randomly and get more frequently as the number
of concurrent jobs increases.
**Environment Description**
* Hudi version : 0.8.0
* Spark version : 2.4.7
* Hive version : 2.3.7
* Hadoop version : 2.10.1
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
* AWS EMR: 5.33.0
---
**Errors**
Type 1 error:
`FileAlreadyExistsException`
```
User class threw exception: org.apache.hudi.exception.HoodieIOException:
Failed to create file
s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:526)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:487)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstant(HoodieActiveTimeline.java:147)
at
org.apache.hudi.client.AbstractHoodieWriteClient.startCommit(AbstractHoodieWriteClient.java:714)
at
org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:700)
at
org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:691)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
...
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already
exists:s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested
at
com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
```
Type 2 error:
`IllegalArgumentException`
```
java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
```
Type 3 error:
`HoodieRollbackException`
```
User class threw exception:
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback
s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs commits
20211011064933
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:593)
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:808)
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:797)
at
org.apache.hudi.client.AbstractHoodieWriteClient.lambda$clean$1cda88ee$1(AbstractHoodieWriteClient.java:648)
at
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:135)
at
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:647)
at
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:630)
at
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:661)
at
org.apache.hudi.client.AbstractHoodieWriteClient.autoCleanOnCommit(AbstractHoodieWriteClient.java:494)
at
org.apache.hudi.client.AbstractHoodieWriteClient.postCommit(AbstractHoodieWriteClient.java:431)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:186)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
...
Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete
instant [==>20211011064933__commit__REQUESTED]
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:196)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:173)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:225)
at
```
Type 4 error:
Athena table wasn't updated by Hudi to include new partition properly.
Our understanding is that with `hoodie.datasource.hive_sync.enable` set to
true, Hudi is suppose to keep the corresponding Athena table up-to-date for any
schema change and and new partitions.
However, we found that sometime the partition is missing in Athena table
after a job completed successfully.
We were able to query the data from the missing partition by in a Hudi
snapshot table, which suggests that the data is there it's just that for some
reason Athena table didn't pick up that partition.
---
**Error Reproduction**
To reproduce the errors, I built a Scala jar that contains the logic to
generate a testing data and save with `optimistic_concurrency_control` enabled,
and ran 8 spark steps concurrently.
```Scala
package com.hudioccdebug
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.sql.functions.{current_timestamp, lit, udf}
object HudiOccDebug {
val spark: SparkSession =
SparkSession.builder.enableHiveSupport().getOrCreate()
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
import sqlContext.implicits._
def main(args: Array[String]): Unit = {
val dbName = "test_db"
val tableName = args(0)
val rowCount = args(1).toInt
val batchNum = args(2).toInt
val lockProvider = args(3)
val s3Location = "s3://hudi_debug"
val r = scala.util.Random
val n = 60
for (i <- 1 to batchNum) {
println(s"Starting batch $i")
val m = (r.nextInt % n + n) % n
println(s"sleep for $m seconds before upsert")
Thread.sleep(m * 1000)
val df = generateTestingDF(spark, rowCount,
spark.sparkContext.applicationId)
saveDf(s3Location, dbName, tableName, df, getHudiOptions(dbName,
tableName, lockProvider))
}
}
def generateTestingDF(spark: SparkSession, rowCount: Int = 1000000,
partitionKey: String): DataFrame = {
val uuid = udf(() => java.util.UUID.randomUUID().toString)
def randomStringGen(length: Int) =
scala.util.Random.alphanumeric.take(length).mkString
val df = spark.sparkContext.parallelize(
Seq.fill(rowCount){(randomStringGen(4), randomStringGen(4),
randomStringGen(6))}, 10
).toDF("col_1", "col_2", "col_3").withColumn("partition",
lit(partitionKey)).withColumn("uuid", uuid()).withColumn("ts",
current_timestamp())
df
}
def getHudiOptions(dbName:String, tableName: String, lockProvider: String)
: Map[String, String] = {
def getHiveServerURI: String = {
val hiveMetastoreURIs = new HiveConf().get("hive.metastore.uris")
val parsedMetastoreURI = if (hiveMetastoreURIs != null)
hiveMetastoreURIs.replaceAll("thrift://", "") else ":"
val hiveServer2URI = parsedMetastoreURI.substring(0,
parsedMetastoreURI.lastIndexOf(":"))
hiveServer2URI
}
var m = Map[String, String](
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.table.name" -> tableName,
"hoodie.consistency.check.enabled" -> "true",
"hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.datasource.write.keygenerator.class" ->
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.parquet.max.file.size" -> String.valueOf(500 * 1024 * 1024),
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.jdbcurl" ->
s"jdbc:hive2://${getHiveServerURI}:10000",
"hoodie.datasource.hive_sync.database" -> dbName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.hive_sync.partition_fields" -> "partition",
"hoodie.datasource.hive_sync.partition_extractor_class" ->
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.fail.on.timeline.archiving" -> "false"
)
if (lockProvider.equals("ZookeeperBasedLockProvider")) {
print("Using ZookeeperBasedLockProvider")
m += (
"hoodie.write.lock.zookeeper.url" -> "ip-10-0-227-209.ec2.internal",
"hoodie.write.lock.zookeeper.port" -> "2181",
"hoodie.write.lock.zookeeper.lock_key" -> "test_table",
"hoodie.write.lock.zookeeper.base_path" -> "/test",
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" ->
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
"hoodie.write.lock.client.num_retries" -> "20",
"hoodie.write.lock.wait_time_ms" -> "20000",
"hoodie.write.lock.wait_time_ms_between_retry" -> "5000"
)
}
m
}
def saveDf(
s3Location:String,
dbName:String,
tableName:String,
inputDf: DataFrame,
options: Map[String, String]
): Unit = {
inputDf.write
.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save("%s/%s/%s".format(s3Location, dbName, tableName))
}
}
```
```
spark-submit \
--deploy-mode cluster \
--executor-memory 43g \
--driver-memory 43g \
--executor-cores 6 \
--class com.hudioccdebug.HudiOccDebug \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars
/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
\
s3://hudi_debug/jars/hudi_occ_debug.jar
"hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs" "500000" "1"
"ZookeeperBasedLockProvider"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]