bkosuru opened a new issue, #5752:
URL: https://github.com/apache/hudi/issues/5752
Hello,
Bulk Insert failed in hudi 0.11.0 with partition containing <>. (see
stacktrace below)
This is hdfs based hudi table
We are not able to upgrade to hudi 0.11.0
See related issues
https://github.com/apache/hudi/issues/5569 (Insert failed)
https://github.com/apache/hudi/issues/5741 (encoding does not happen for <>)
To Reproduce
Steps to reproduce the behavior:
**To Reproduce**
Steps to reproduce the behavior:
```
val df = spark.read.format("hudi").load(input)
val sdf = df.select("s", "p", "o", "g", "isDeleted")
val count = sdf.count()
rowsPerFile = 5000000
val parallelism = Math.max(200, Math.ceil(count / rowsPerFile)).toInt
Spark settings:
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.ui.enabled", "false")
.set("spark.sql.parquet.mergeSchema", "false")
.set("spark.sql.files.ignoreCorruptFiles", "true")
.set("spark.sql.hive.convertMetastoreParquet", "false")
spark-submit:
spark-submit
--master yarn
--deploy-mode cluster
--name kg-copy
--driver-memory 24G
--executor-memory 50G
--executor-cores 6
--num-executors 500
--conf spark.dynamicAllocation.enabled=False
--conf spark.network.timeout=240s
--conf spark.shuffle.sasl.timeout=60000
--conf spark.driver.maxResultSize=20g
--conf spark.port.maxRetries=60
--conf spark.shuffle.service.enabled=True
--conf spark.sql.shuffle.partitions=3000
--conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof"
--conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof"
--conf spark.driver.memoryOverhead=1024
--conf spark.executor.memoryOverhead=3072
--conf spark.yarn.max.executor.failures=100
--conf spark.kryoserializer.buffer.max=512m
--conf spark.task.maxFailures=4
--conf spark.rdd.compress=True
private val AVG_RECORD_SIZE: Int =
256 // approx bytes of our average record, contra Hudi default assumption of
1024
private val ONE_GIGABYTE: Int =
1024 * 1024 * 1024 // used for Parquet file size & block size
private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
sdf.write
.format("hudi")
// DataSourceWriteOptions
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
.option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator")
.option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL)
.option(INSERT_DROP_DUPS_OPT_KEY, value = false)
.option(INSERT_PARALLELISM, parallelism)
.option(PARTITIONPATH_FIELD_OPT_KEY, "g,p")
.option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted")
.option(RECORDKEY_FIELD_OPT_KEY, "s,o")
.option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true)
// HoodieCompactionConfig
.option(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key, 64)
// HoodieIndexConfig
.option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
.option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
.option(INDEX_TYPE.key, HoodieIndex.IndexType.BLOOM.name)
// HoodieStorageConfig
.option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
.option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
.option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE)
// Commit history
.option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
.option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
.option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
// HoodieWriteConfig
.option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false")
.option(TABLE_NAME, "spog")
.mode(SaveMode.Append)
class SpoKeyGenerator(props: TypedProperties)
extends ComplexKeyGenerator(props) {
def hash128(s: String): String = {
val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
h(0).toString + h(1).toString
}
override def getRecordKey(record: GenericRecord): String = {
val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false, false)
val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false, false)
genKey(s, o)
}
private def genKey(s: String, o: String): String = hash128(s + o)
override def getRecordKey(row: Row): String = {
val s = row.getAs(0).toString
val o = row.getAs(1).toString
genKey(s, o)
}
}
```
**Environment Description**
Hudi version : 0.11.0
Spark version : 2.4.4
Hive version :
Hadoop version :
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
**Stacktrace**
User class threw exception: org.apache.spark.SparkException: Writing job
aborted.
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:260)
at
org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:553)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:175)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at com.gsk.kg.common.HudiUtils$.insertBulk(HudiUtils.scala:96)
at com.gsk.kg.copy.Copy$$anonfun$copy$1.apply(Copy.scala:106)
at com.gsk.kg.copy.Copy$$anonfun$copy$1.apply(Copy.scala:83)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.gsk.kg.copy.Copy$.copy(Copy.scala:83)
at com.gsk.kg.copy.Copy$.main(Copy.scala:118)
at com.gsk.kg.copy.Copy.main(Copy.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 77 in stage 8.0 failed 4 times, most recent failure: Lost task
77.3 in stage 8.0 (TID 364803, us1salxhpw1306.corpnet2.com, executor 169):
java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal
character in scheme name at index 1:
g=<http://id.gsk.com/dataset/GTEx/%3E/p=%3Chttp:/gsk-kg.rdip.gsk.com/gtex/gene_gtex8_rsemv130_transcript_expected_count%23ensembl_gene_id%3E
at org.apache.hadoop.fs.Path.initialize(Path.java:259)
at org.apache.hadoop.fs.Path.<init>(Path.java:217)
at org.apache.hadoop.fs.Path.<init>(Path.java:125)
at org.apache.hudi.common.fs.FSUtils.getPartitionPath(FSUtils.java:611)
at org.apache.hudi.common.fs.FSUtils.getPartitionPath(FSUtils.java:606)
at
org.apache.hudi.io.storage.row.HoodieRowCreateHandle.makeNewPath(HoodieRowCreateHandle.java:172)
at
org.apache.hudi.io.storage.row.HoodieRowCreateHandle.<init>(HoodieRowCreateHandle.java:84)
at
org.apache.hudi.internal.BulkInsertDataInternalWriterHelper.getRowCreateHandle(BulkInsertDataInternalWriterHelper.java:165)
at
org.apache.hudi.internal.BulkInsertDataInternalWriterHelper.write(BulkInsertDataInternalWriterHelper.java:141)
at
org.apache.hudi.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:47)
at
org.apache.hudi.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:34)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.URISyntaxException: Illegal character in scheme name at
index 1:
g=<http://id.gsk.com/dataset/GTEx/%3E/p=%3Chttp:/gsk-kg.rdip.gsk.com/gtex/gene_gtex8_rsemv130_transcript_expected_count%23ensembl_gene_id%3E
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parse(URI.java:3048)
at java.net.URI.<init>(URI.java:746)
at org.apache.hadoop.fs.Path.initialize(Path.java:256)
... 24 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:930)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:930)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:930)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:741)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 51 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException:
Illegal character in scheme name at index 1:
g=<http://id.gsk.com/dataset/GTEx/%3E/p=%3Chttp:/gsk-kg.rdip.gsk.com/gtex/gene_gtex8_rsemv130_transcript_expected_count%23ensembl_gene_id%3E
at org.apache.hadoop.fs.Path.initialize(Path.java:259)
at org.apache.hadoop.fs.Path.<init>(Path.java:217)
at org.apache.hadoop.fs.Path.<init>(Path.java:125)
at org.apache.hudi.common.fs.FSUtils.getPartitionPath(FSUtils.java:611)
at org.apache.hudi.common.fs.FSUtils.getPartitionPath(FSUtils.java:606)
at
org.apache.hudi.io.storage.row.HoodieRowCreateHandle.makeNewPath(HoodieRowCreateHandle.java:172)
at
org.apache.hudi.io.storage.row.HoodieRowCreateHandle.<init>(HoodieRowCreateHandle.java:84)
at
org.apache.hudi.internal.BulkInsertDataInternalWriterHelper.getRowCreateHandle(BulkInsertDataInternalWriterHelper.java:165)
at
org.apache.hudi.internal.BulkInsertDataInternalWriterHelper.write(BulkInsertDataInternalWriterHelper.java:141)
at
org.apache.hudi.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:47)
at
org.apache.hudi.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:34)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1442)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.URISyntaxException: Illegal character in scheme name at
index 1:
g=<http://id.gsk.com/dataset/GTEx/%3E/p=%3Chttp:/gsk-kg.rdip.gsk.com/gtex/gene_gtex8_rsemv130_transcript_expected_count%23ensembl_gene_id%3E
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parse(URI.java:3048)
at java.net.URI.<init>(URI.java:746)
at org.apache.hadoop.fs.Path.initialize(Path.java:256)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]