cb149 opened a new issue #3984:
URL: https://github.com/apache/hudi/issues/3984


   **Describe the problem you faced**
   
   One of the applications where I am using Hudi is SparkStructuredStreaming 
from Kafka to Hudi, which is scheduled to run once every hour, with about 3600 
JSON messages per run, which are then exploded to about 7 million rows and 
upserted into a year=/month=/day= partitioned COW table.
   
   After upgrading to 0.9.0 I am facing two problems:
   
   ### First:
   With no other changes than the upgrade to 0.9.0, my spark job has a 
decreased performance compared to 0.8.0 and has even run OOM, even though it 
has run with the same configurations for multiple months without any issues and 
there have been no changes to the data or amount of data.
   Previously, the job took about 2-4 minutes to ingest the data, now it takes 
4-8 minutes.
   
   The decreased performance and OutOfMemory Error can be seen in the stages 
UpsertPartitioner Getting small files from partitions, even with 5GB extra 
memory compared to what it ran with 0.8.0
   ```ExecutorLostFailure (executor 8 exited caused by one of the running 
tasks) Reason: Container killed by YARN for exceeding memory limits. 15.0 GB of 
15 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead or disabling 
yarn.nodemanager.vmem-check-enabled because of YARN-4714.```
   
   with the log for the task that ran OOM at the end of this ticket.
   
   ### Second:
   Due to other limitations, all my tables are stored with 
`DataSourceWriteOptions.PARTITIONPATH_FIELD -> "_partition",`
   and the _partition column contains Strings like 
`"year=2021/month=11/day=12"`. I've used this for multiple Hudi versions, 
however 0.9.0 seems to cause issues. While the table is still partitioned 
correctly, using `spark.read.format("hudi").load(basePath)` does not return the 
columns year, month or day anymore, so the only way to do partition pruning is 
by using a filter on _partition.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Run step **Setup** from https://hudi.apache.org/docs/quick-start-guide
   2. Create dataframe and replace partitionpath with 
continent=.../country=.../city=..., e.g. 
continent=asia/country=india/city=chennai  
   ```scala
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
   ```
   3. Write the table
   ```scala
   newDf.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD.key(), "ts").
     option(RECORDKEY_FIELD.key(), "uuid").
     option(PARTITIONPATH_FIELD.key(), "partitionpath").
     option(TBL_NAME.key(), tableName).
     mode(Overwrite).
     save(basePath)
   ```
   4. Query the table
   ```scala
   val tripsSnapshotDF = spark.
     read.
     format("hudi").
     load(basePath)
   ```
   5. Columns continent,country and city are missing from tripsSnapshotDF
   ```scala
   scala> tripsSnapshotDF.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- begin_lat: double (nullable = true)
    |-- begin_lon: double (nullable = true)
    |-- driver: string (nullable = true)
    |-- end_lat: double (nullable = true)
    |-- end_lon: double (nullable = true)
    |-- fare: double (nullable = true)
    |-- rider: string (nullable = true)
    |-- ts: long (nullable = true)
    |-- uuid: string (nullable = true)
    |-- partitionpath: string (nullable = true)
   
   
   scala> spark.read.parquet(basePath)
   res6: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 16 more fields]
   
   scala> res6.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- begin_lat: double (nullable = true)
    |-- begin_lon: double (nullable = true)
    |-- driver: string (nullable = true)
    |-- end_lat: double (nullable = true)
    |-- end_lon: double (nullable = true)
    |-- fare: double (nullable = true)
    |-- partitionpath: string (nullable = true)
    |-- rider: string (nullable = true)
    |-- ts: long (nullable = true)
    |-- uuid: string (nullable = true)
    |-- continent: string (nullable = true)
    |-- country: string (nullable = true)
    |-- city: string (nullable = true)
   ```
   
   **Expected behavior**
   As documented in https://hudi.apache.org/docs/quick-start-guide#query-data,
   
   > //load(basePath) use "/partitionKey=partitionValue" folder structure for 
Spark auto partition discovery
   
   Reading a table with partitionKey=partitionValue should result in the 
partitionKey being available as a column for partition pruning, same as 
`spark.read.parquet(basePath)` 
   
   In a shell with Hudi 0.8.0, the columns are available
   ```scala
   scala> val basePath = "file:///tmp/hudi_trips_cow"
   basePath: String = file:///tmp/hudi_trips_cow
   
   scala> val tripsSnapshotDF = spark.
        |   read.
        |   format("hudi").
        |   load(basePath)
   21/11/12 18:23:34 WARN DefaultSource: Loading Base File Only View.
   21/11/12 18:23:34 WARN SizeEstimator: Failed to check whether 
UseCompressedOops is set; assuming yes
   tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: 
string, _hoodie_commit_seqno: string ... 16 more fields]
   
   scala> tripsSnapshotDF.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- begin_lat: double (nullable = true)
    |-- begin_lon: double (nullable = true)
    |-- driver: string (nullable = true)
    |-- end_lat: double (nullable = true)
    |-- end_lon: double (nullable = true)
    |-- fare: double (nullable = true)
    |-- partitionpath: string (nullable = true)
    |-- rider: string (nullable = true)
    |-- ts: long (nullable = true)
    |-- uuid: string (nullable = true)
    |-- continent: string (nullable = true)
    |-- country: string (nullable = true)
    |-- city: string (nullable = true)
   ```
   
   **Environment Description**
   
   * Hudi version : 0.9.0
   
   * Spark version : 2.4.0
   
   * Hive version :
   
   * Hadoop version : 3.0.0
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```21/11/12 06:05:50 INFO util.ClusteringUtils: Found 0 files in pending 
clustering operations
   21/11/12 06:05:50 INFO view.AbstractTableFileSystemView: Building file 
system view for partition (year=2021/month=11/day=12)
   21/11/12 06:05:50 INFO view.AbstractTableFileSystemView: addFilesToView: 
NumFiles=15, NumFileGroups=9, FileGroupsCreationTime=12, StoreTimeTaken=1
   21/11/12 06:05:50 INFO io.HoodieMergeHandle: MaxMemoryPerPartitionMerge => 
1073741824
   21/11/12 06:05:50 INFO collection.ExternalSpillableMap: Estimated Payload 
size => 768
   21/11/12 06:05:50 INFO collection.ExternalSpillableMap: New Estimated 
Payload size => 550
   21/11/12 06:05:57 INFO collection.BitCaskDiskMap: Spilling to file location 
/opt/cloudera/var/tmp/hudi-BITCASK-7980c51b-f1bb-46b2-806d-58bd72d7ec8f/09b0c846-6255-4407-acae-857c28ef1f82
 in host (...) with hostname (...)
   21/11/12 06:07:27 INFO io.HoodieMergeHandle: Number of entries in 
MemoryBasedMap => 1561807Total size in bytes of MemoryBasedMap => 
858993874Number of entries in BitCaskDiskMap => 2114256Size of file spilled to 
disk => 747196908
   21/11/12 06:07:27 INFO io.HoodieMergeHandle: 
partitionPath:year=2021/month=11/day=12, fileId to be 
merged:0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0
   21/11/12 06:07:27 INFO io.HoodieMergeHandle: Merging new data into oldPath 
/.../year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-52_20211112050651.parquet,
 as newPath 
/.../year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet
   21/11/12 06:07:27 INFO marker.DirectWriteMarkers: Creating Marker 
Path=/.../.hoodie/.temp/20211112060041/year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet.marker.MERGE
   21/11/12 06:07:27 INFO marker.DirectWriteMarkers: [direct] Created marker 
file 
/.../.hoodie/.temp/20211112060041/year=2021/month=11/day=12/0f1e66e7-3bb6-4f33-84d6-ca221f4d3f7b-0_0-20-89_20211112060041.parquet.marker.MERGE
 in 51 ms
   21/11/12 06:07:27 INFO zlib.ZlibFactory: Successfully loaded & initialized 
native-zlib library
   21/11/12 06:07:27 INFO compress.CodecPool: Got brand-new compressor [.gz]
   21/11/12 06:07:28 INFO queue.IteratorBasedQueueProducer: starting to buffer 
records
   21/11/12 06:07:28 INFO queue.BoundedInMemoryExecutor: starting consumer 
thread
   21/11/12 06:07:28 INFO compress.CodecPool: Got brand-new decompressor [.gz]
   21/11/12 06:07:32 INFO queue.IteratorBasedQueueProducer: finished buffering 
records
   21/11/12 06:07:33 INFO queue.BoundedInMemoryExecutor: Queue Consumption is 
done; notifying producer threads
   21/11/12 06:09:01 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED 
SIGNAL TERM
   21/11/12 06:09:01 ERROR commit.BaseSparkCommitActionExecutor: Error 
upserting bucketType UPDATE for partition :0
   java.lang.NullPointerException
        at 
org.apache.hudi.common.util.SpillableMapUtils.readInternal(SpillableMapUtils.java:58)
        at 
org.apache.hudi.common.util.SpillableMapUtils.readBytesFromDisk(SpillableMapUtils.java:49)
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
        at 
org.apache.hudi.common.util.collection.LazyFileIterable$LazyFileIterator.next(LazyFileIterable.java:101)
        at 
org.apache.hudi.common.util.collection.ExternalSpillableMap$IteratorWrapper.next(ExternalSpillableMap.java:331)
        at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:350)
        at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:107)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:334)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:325)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:298)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:156)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to