cb149 opened a new issue #3984:
URL: https://github.com/apache/hudi/issues/3984
**Describe the problem you faced**
One of the applications where I am using Hudi is SparkStructuredStreaming
from Kafka to Hudi, which is scheduled to run once every hour, with about 3600
JSON messages per run, which are then exploded to about 7 million rows and
upserted into a year=/month=/day= partitioned COW table.
After upgrading to 0.9.0 I am facing two problems:
### First:
With no other changes than the upgrade to 0.9.0, my spark job has a
decreased performance compared to 0.8.0 and has even run OOM, even though it
has run with the same configurations for multiple months without any issues and
there have been no changes to the data or amount of data.
Previously, the job took about 2-4 minutes to ingest the data, now it takes
4-8 minutes.
The decreased performance and OutOfMemory Error can be seen in the stages
UpsertPartitioner Getting small files from partitions, even with 5GB extra
memory compared to what it ran with 0.8.0
```ExecutorLostFailure (executor 8 exited caused by one of the running
tasks) Reason: Container killed by YARN for exceeding memory limits. 15.0 GB of
15 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.```
with the log for the task that ran OOM at the end of this ticket.
### Second:
Due to other limitations, all my tables are stored with
`DataSourceWriteOptions.PARTITIONPATH_FIELD -> "_partition",`
and the _partition column contains Strings like
`"year=2021/month=11/day=12"`. I've used this for multiple Hudi versions,
however 0.9.0 seems to cause issues. While the table is still partitioned
correctly, using `spark.read.format("hudi").load(basePath)` does not return the
columns year, month or day anymore, so the only way to do partition pruning is
by using a filter on _partition.
**To Reproduce**
Steps to reproduce the behavior:
1. Run step **Setup** from https://hudi.apache.org/docs/quick-start-guide
2. Create dataframe and replace partitionpath with
continent=.../country=.../city=..., e.g.
continent=asia/country=india/city=chennai
```scala
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath",
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
```
3. Write the table
```scala
newDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Overwrite).
save(basePath)
```
4. Query the table
```scala
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
```
5. Columns continent,country and city are missing from tripsSnapshotDF
```scala
scala> tripsSnapshotDF.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- partitionpath: string (nullable = true)
scala> spark.read.parquet(basePath)
res6: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string,
_hoodie_commit_seqno: string ... 16 more fields]
scala> res6.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- continent: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
```
**Expected behavior**
As documented in https://hudi.apache.org/docs/quick-start-guide#query-data,
> //load(basePath) use "/partitionKey=partitionValue" folder structure for
Spark auto partition discovery
Reading a table with partitionKey=partitionValue should result in the
partitionKey being available as a column for partition pruning, same as
`spark.read.parquet(basePath)`
In a shell with Hudi 0.8.0, the columns are available
```scala
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
scala> val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath)
21/11/12 18:23:34 WARN DefaultSource: Loading Base File Only View.
21/11/12 18:23:34 WARN SizeEstimator: Failed to check whether
UseCompressedOops is set; assuming yes
tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time:
string, _hoodie_commit_seqno: string ... 16 more fields]
scala> tripsSnapshotDF.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
|-- continent: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
```
**Environment Description**
* Hudi version : 0.9.0
* Spark version : 2.4.0
* Hive version :
* Hadoop version : 3.0.0
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```21/11/12 06:05:50 INFO util.ClusteringUtils: Found 0 files in pending
clustering operations
21/11/12 06:05:50 INFO view.AbstractTableFileSystemView: Building file
system view for partition (year=2021/month=11/day=12)
21/11/12 06:05:50 INFO view.AbstractTableFileSystemView: addFilesToView:
NumFiles=15, NumFileGroups=9, FileGroupsCreationTime=12, StoreTimeTaken=1
21/11/12 06:05:50 INFO io.HoodieMergeHandle: MaxMemoryPerPartitionMerge =>
1073741824
21/11/12 06:05:50 INFO collection.ExternalSpillableMap: Estimated Payload
size => 768
21/11/12 06:05:50 INFO collection.ExternalSpillableMap: New Estimated
Payload size => 550
21/11/12 06:05:57 INFO collection.BitCaskDiskMap: Spilling to file location
/opt/cloudera/var/tmp/hudi-BITCASK-7980c51b-f1bb-46b2-806d-58bd72d7ec8f/09b0c846-6255-4407-acae-857c28ef1f82
in host (...) with hostname (...)
21/11/12 06:07:27 INFO io.HoodieMergeHandle: Number of entries in
MemoryBasedMap => 1561807Total size in bytes of MemoryBasedMap =>
858993874Number of entries in BitCaskDiskMap => 2114256Size of file spilled to
disk => 747196908
21/11/12 06:07:27 INFO io.HoodieMergeHandle:
partitionPath:year=2021/month=11/day=12, fileId to be
merged:0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0
21/11/12 06:07:27 INFO io.HoodieMergeHandle: Merging new data into oldPath
/.../year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-52_20211112050651.parquet,
as newPath
/.../year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet
21/11/12 06:07:27 INFO marker.DirectWriteMarkers: Creating Marker
Path=/.../.hoodie/.temp/20211112060041/year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet.marker.MERGE
21/11/12 06:07:27 INFO marker.DirectWriteMarkers: [direct] Created marker
file
/.../.hoodie/.temp/20211112060041/year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet.marker.MERGE
in 51 ms
21/11/12 06:07:27 INFO zlib.ZlibFactory: Successfully loaded & initialized
native-zlib library
21/11/12 06:07:27 INFO compress.CodecPool: Got brand-new compressor [.gz]
21/11/12 06:07:28 INFO queue.IteratorBasedQueueProducer: starting to buffer
records
21/11/12 06:07:28 INFO queue.BoundedInMemoryExecutor: starting consumer
thread
21/11/12 06:07:28 INFO compress.CodecPool: Got brand-new decompressor [.gz]
21/11/12 06:07:32 INFO queue.IteratorBasedQueueProducer: finished buffering
records
21/11/12 06:07:33 INFO queue.BoundedInMemoryExecutor: Queue Consumption is
done; notifying producer threads
21/11/12 06:09:01 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL TERM
21/11/12 06:09:01 ERROR commit.BaseSparkCommitActionExecutor: Error
upserting bucketType UPDATE for partition :0
java.lang.NullPointerException
at
org.apache.hudi.common.util.SpillableMapUtils.readInternal(SpillableMapUtils.java:58)
at
org.apache.hudi.common.util.SpillableMapUtils.readBytesFromDisk(SpillableMapUtils.java:49)
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
at
org.apache.hudi.common.util.collection.LazyFileIterable$LazyFileIterator.next(LazyFileIterable.java:101)
at
org.apache.hudi.common.util.collection.ExternalSpillableMap$IteratorWrapper.next(ExternalSpillableMap.java:331)
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:350)
at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:107)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:334)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:325)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:298)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:156)```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]