pavimotorq opened a new issue, #7015:
URL: https://github.com/apache/hudi/issues/7015
**Describe the problem you faced**
I'm trying to read a file containing around 200K records in json format,
partition it based on a field "purpose" and store in the local HDFS cluster.
But it's always failing with the following error but it works if I specify it
to be a non-HDFS location.
Error logs:
````
00:48 WARN: Timeline-server-based markers are not supported for HDFS: base
path hdfs://localhost:9000/user/hive/warehouse/local_cow. Falling back to
direct markers.
22/10/20 21:22:55 WARN DataStreamer: DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing
pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
````
**To Reproduce**
````
import findspark
findspark.init('/home/pavithran/DFSHudi/spark')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DFSHudi") \
.config('spark.jars.packages',
'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-azure:3.3.4')
\
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.getOrCreate()
filtered_df = spark.read.option("multiLine",
"true").json("hdfs://localhost:9000/data/dataset_batch1.json")
print((filtered_df.count(), len(filtered_df.columns)))
# Execute this before resuming session.
basePath = "hdfs://localhost:9000/user/hive/warehouse/local_cow"
tableName = "hudi_dfs_data"
````
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version : 0.12
* Spark version : 3.3.0
* Hive version :
* Hadoop version : 3.3.4
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```00:48 WARN: Timeline-server-based markers are not supported for HDFS:
base path hdfs://localhost:9000/user/hive/warehouse/local_cow. Falling back to
direct markers.
22/10/20 21:22:55 WARN DataStreamer: DataStreamer Exception
java.io.IOException: Failed to replace a bad datanode on the existing
pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
22/10/20 21:22:55 WARN DFSClient: Error while syncing
java.io.IOException: Failed to replace a bad datanode on the existing
pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
22/10/20 21:22:55 ERROR BaseSparkCommitActionExecutor: Error upserting
bucketType UPDATE for partition :0
org.apache.hudi.exception.HoodieAppendException: Failed while appending
records to
hdfs://localhost:9000/user/hive/warehouse/local_cow/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-14-12
at
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:410)
at
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:382)
at
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:84)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to replace a bad datanode on the
existing pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
22/10/20 21:22:55 WARN BlockManager: Putting block rdd_94_0 failed due to
exception org.apache.hudi.exception.HoodieUpsertException: Error upserting
bucketType UPDATE for partition :0.
22/10/20 21:22:55 WARN BlockManager: Block rdd_94_0 could not be removed as
it was not found on disk or in memory
22/10/20 21:22:55 ERROR Executor: Exception in task 0.0 in stage 46.0 (TID
1243)
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while
appending records to
hdfs://localhost:9000/user/hive/warehouse/local_cow/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-14-12
at
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:410)
at
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:382)
at
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:84)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
... 28 more
Caused by: java.io.IOException: Failed to replace a bad datanode on the
existing pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
22/10/20 21:22:55 WARN TaskSetManager: Lost task 0.0 in stage 46.0 (TID
1243) (localhost executor driver):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieAppendException: Failed while
appending records to
hdfs://localhost:9000/user/hive/warehouse/local_cow/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-14-12
at
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:410)
at
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:382)
at
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:84)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
... 28 more
Caused by: java.io.IOException: Failed to replace a bad datanode on the
existing pipeline due to no more good datanodes being available to try. (Nodes:
current=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]],
original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-9d9e9d11-c1d2-4ed0-bdf3-05709740ab9d,DISK]]).
The current failed datanode replacement policy is DEFAULT, and a client may
configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy'
in its configuration.
at
org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1352)
at
org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1420)
at
org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1646)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1547)
at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1529)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:717)
22/10/20 21:22:55 ERROR TaskSetManager: Task 0 in stage 46.0 failed 1 times;
aborting job
```
**My ultimate use case**: I want to able to write the data in the hive style
partitioning , sync it to hive as part of write itself and query it using
presto.
```
# REF: https://hudi.apache.org/docs/configurations/#SPARK_DATASOURCE
# REF: https://hudi.apache.org/docs/syncing_metastore (for syncing to the
hive metastore)
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'key',
'hoodie.datasource.write.partitionpath.field': 'purpose',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'startTime',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', #default
(optional),
'hoodie.index.type': 'SIMPLE',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://hiveserver:10000/',
'hoodie.datasource.hive_sync.database': 'default',
'hoodie.datasource.hive_sync.table': 'hudi_dfs_cow',
'hoodie.datasource.hive_sync.partition_fields': 'purpose',
'hoodie.datasource.hive_sync.enable': 'true'
}
#
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html
filtered_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]