[ 
https://issues.apache.org/jira/browse/HUDI-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Selvaraj periyasamy updated HUDI-1372:
--------------------------------------
    Description: 
Am using Hudi 0.5.0 .

 

trying to write into COW table which includes 350 columns. HoodieSparkSQLWriter 
takes more than 6 mins to merge.  I dont see anything spilling to disk. Is 
there any tuning to improve this one? I have disabled 

option("hoodie.combine.before.upsert","false"). When  this one sets to true, 
didnot see much difference.

 

 

20/11/05 07:43:37 INFO DefaultSource: Constructing hoodie (as parquet) data 
source with options :Map(hoodie.datasource.write.insert.drop.duplicates -> 
false, hoodie.datasource.hive_sync.database -> default, 
hoodie.parquet.small.file.limit -> 134217728, 
hoodie.copyonwrite.record.size.estimate -> 160, 
hoodie.insert.shuffle.parallelism -> 1000, path -> 
/projects/cdp/data/cdp_reporting/trr_test2, 
hoodie.datasource.write.precombine.field -> request_id, 
hoodie.datasource.hive_sync.partition_fields -> , 
hoodie.datasource.write.payload.class -> 
com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload, 
hoodie.datasource.hive_sync.partition_extractor_class -> 
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor, 
hoodie.parquet.max.file.size -> 268435456, 
hoodie.datasource.write.streaming.retry.interval.ms -> 2000, 
hoodie.datasource.hive_sync.table -> unknown, 
hoodie.datasource.write.streaming.ignore.failed.batch -> true, 
hoodie.datasource.write.operation -> upsert, hoodie.parquet.compression.codec 
-> snappy, hoodie.datasource.hive_sync.enable -> false, 
hoodie.datasource.write.recordkey.field -> request_id, 
hoodie.datasource.view.type -> read_optimized, hoodie.table.name -> trr2, 
hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://localhost:10000, 
hoodie.datasource.write.table.type -> COPY_ON_WRITE, 
hoodie.memory.merge.max.size -> 2004857600000, 
hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.cleaner.policy -> 
KEEP_LATEST_FILE_VERSIONS, hoodie.datasource.hive_sync.username -> hive, 
hoodie.datasource.write.streaming.retry.count -> 3, 
hoodie.combine.before.upsert -> false, hoodie.datasource.hive_sync.password -> 
hive, hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.ComplexKeyGenerator, hoodie.keep.max.commits -> 3, 
hoodie.upsert.shuffle.parallelism -> 1000, 
hoodie.datasource.hive_sync.assume_date_partitioning -> false, 
hoodie.cleaner.commits.retained -> 1, hoodie.keep.min.commits -> 2, 
hoodie.datasource.write.partitionpath.field -> transaction_month, 
hoodie.datasource.write.commitmeta.key.prefix -> _, 
hoodie.index.bloom.num_entries -> 1500000)

 

 

Code snippet.

 

val responseDF = trrDF.write.format("org.apache.hudi").
 option("hoodie.insert.shuffle.parallelism","1000").
 option("hoodie.upsert.shuffle.parallelism","1000").
 option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
 option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY,"request_id"). 
 option("hoodie.memory.merge.max.size", "2004857600000").
 option(PARTITIONPATH_FIELD_OPT_KEY,"transaction_month").
 option(KEYGENERATOR_CLASS_OPT_KEY,"org.apache.hudi.ComplexKeyGenerator").
 
option(PAYLOAD_CLASS_OPT_KEY,"com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload").
 option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
 option("hoodie.cleaner.commits.retained", 1). 
 option("hoodie.keep.min.commits",2).
 option("hoodie.keep.max.commits",3). 
 option("hoodie.index.bloom.num_entries","1500000").
 option("hoodie.copyonwrite.record.size.estimate","160").
 option("hoodie.parquet.max.file.size",String.valueOf(256*1024*1024)).
 option("hoodie.parquet.small.file.limit",String.valueOf(128*1024*1024)).
 option("hoodie.parquet.compression.codec","snappy").
 *option("hoodie.combine.before.upsert","false").*
 option(RECORDKEY_FIELD_OPT_KEY,"request_id").
 option(TABLE_NAME, "trr2").
 mode(Append).
 save("/projects/cdp/data/cdp_reporting/trr_test2")

 

!image-2020-11-04-23-54-12-187.png!

 

 

tasks corresponding to stage 20:

 

!image-2020-11-04-23-58-00-554.png!

 

Logs from one of the executor.

 

 

20/11/05 06:17:21 INFO TorrentBroadcast: Started reading broadcast variable 20
 20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes 
in memory (estimated size 87.2 KB, free 12.1 GB)
 20/11/05 06:17:21 INFO TorrentBroadcast: Reading broadcast variable 20 took 4 
ms
 20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20 stored as values in 
memory (estimated size 239.4 KB, free 12.1 GB)
 20/11/05 06:17:21 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 8, fetching them
 20/11/05 06:17:21 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef(spark://[email protected]:33406)
 20/11/05 06:17:21 INFO MapOutputTrackerWorker: Got the output locations
 20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Getting 1000 non-empty 
blocks out of 1000 blocks
 20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches 
in 3 ms
 20/11/05 06:17:22 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:22 INFO HoodieMergeHandle: MaxMemoryPerPartitionMerge => 
2004857600000
 20/11/05 06:17:22 INFO DiskBasedMap: Spilling to file location 
/tmp/19d435cf-e581-4b80-b286-9aa092587c6f in host (10.160.39.146) with hostname 
(sl73caehdn0708.visa.com)
 20/11/05 06:17:22 INFO HoodieRecordSizeEstimator: SizeOfRecord => 2552 
SizeOfSchema => 273456
 20/11/05 06:17:22 INFO ExternalSpillableMap: Estimated Payload size => 2664
 20/11/05 06:17:22 INFO ExternalSpillableMap: New Estimated Payload size => 1688
 20/11/05 06:17:37 INFO HoodieMergeHandle: Number of entries in MemoryBasedMap 
=> 1476470Total size in bytes of MemoryBasedMap => 2492281440Number of entries 
in DiskBasedMap => 0Size of file spilled to disk => 0
 20/11/05 06:17:37 INFO FileSystemViewManager: Creating View Manager with 
storage type :MEMORY
 20/11/05 06:17:37 INFO FileSystemViewManager: Creating in-memory based Table 
View
 20/11/05 06:17:37 INFO FileSystemViewManager: Creating InMemory based view for 
basePath /projects/cdp/data/cdp_reporting/trr_test2
 20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from /projects/cdp/data/cdp_reporting/trr_test2
 20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:37 INFO HoodieTableConfig: Loading dataset properties from 
/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/hoodie.properties
 20/11/05 06:17:37 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE from /projects/cdp/data/cdp_reporting/trr_test2
 20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading Active commit timeline 
for /projects/cdp/data/cdp_reporting/trr_test2
 20/11/05 06:17:37 INFO HoodieActiveTimeline: Loaded instants 
java.util.stream.ReferencePipeline$Head@7579056
 20/11/05 06:17:37 INFO AbstractTableFileSystemView: Building file system view 
for partition (202010)
 20/11/05 06:17:37 INFO AbstractTableFileSystemView: #files found in partition 
(202010) =27, Time taken =1
 20/11/05 06:17:37 INFO HoodieTableFileSystemView: Adding file-groups for 
partition :202010, #FileGroups=8
 20/11/05 06:17:37 INFO AbstractTableFileSystemView: addFilesToView: 
NumFiles=27, FileGroupsCreationTime=2, StoreTimeTaken=0
 20/11/05 06:17:37 INFO AbstractTableFileSystemView: Time to load partition 
(202010) =4
 20/11/05 06:17:37 INFO HoodieMergeHandle: partitionPath:202010, fileId to be 
merged:3a404978-eaad-4825-b88a-dc24fff0c623-0
 20/11/05 06:17:37 INFO HoodieMergeHandle: Merging new data into oldPath 
/projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_3-24-10680_20201105055955.parquet,
 as newPath 
/projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.parquet
 20/11/05 06:17:37 INFO HoodieWriteHandle: Creating Marker 
Path=/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/.temp/20201105061418/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.marker
 20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:38 INFO CodecPool: Got brand-new compressor [.snappy]
 20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
 20/11/05 06:17:42 INFO IteratorBasedQueueProducer: starting to buffer records
 20/11/05 06:17:42 INFO BoundedInMemoryExecutor: starting consumer thread
 *20/11/05 06:17:42 INFO CodecPool: Got brand-new decompressor [.snappy]*
 *20/11/05 06:23:41 INFO IteratorBasedQueueProducer: finished buffering records*
 *20/11/05 06:23:41 INFO BoundedInMemoryExecutor: Queue Consumption is done; 
notifying producer threads*
 20/11/05 06:23:48 INFO HoodieMergeHandle: MergeHandle for partitionPath 202010 
fileID 3a404978-eaad-4825-b88a-dc24fff0c623-0, took 385963 ms.
 20/11/05 06:23:48 INFO MemoryStore: Block rdd_59_4 stored as bytes in memory 
(estimated size 304.0 B, free 12.1 GB)
 20/11/05 06:23:48 INFO Executor: Finished task 4.0 in stage 24.0 (TID 10681). 
1010 bytes result sent to driver
 20/11/05 06:23:49 INFO CoarseGrainedExecutorBackend: Got assigned task 10686
 20/11/05 06:23:49 INFO Executor: Running task 4.0 in stage 30.0 (TID 10686)
 20/11/05 06:23:49 INFO TorrentBroadcast: Started reading broadcast variable 21
 20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes 
in memory (estimated size 87.2 KB, free 12.1 GB)
 20/11/05 06:23:49 INFO TorrentBroadcast: Reading broadcast variable 21 took 4 
ms
 20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21 stored as values in 
memory (estimated size 239.6 KB, free 12.1 GB)
 20/11/05 06:23:50 INFO BlockManager: Found block rdd_59_4 locally
 20/11/05 06:23:50 INFO Executor: Finished task 4.0 in stage 30.0 (TID 10686). 
1103 bytes result sent to driver
 20/11/05 06:23:51 INFO CoarseGrainedExecutorBackend: Driver commanded a 
shutdown
 20/11/05 06:23:51 INFO MemoryStore: MemoryStore cleared
 20/11/05 06:23:51 INFO BlockManager: BlockManager stopped
 20/11/05 06:23:51 INFO ShutdownHookManager: Shutdown hook called

 

 

  was:
Am using Hudi 0.5.0 .

 

trying to write into COW table which includes 350 columns. HoodieSparkSQLWriter 
takes more than 6 mins to merge.  I dont see anything spilling to disk. Is 
there any tuning to improve this one? I have disabled 

option("hoodie.combine.before.upsert","false").

 

 

20/11/05 07:43:37 INFO DefaultSource: Constructing hoodie (as parquet) data 
source with options :Map(hoodie.datasource.write.insert.drop.duplicates -> 
false, hoodie.datasource.hive_sync.database -> default, 
hoodie.parquet.small.file.limit -> 134217728, 
hoodie.copyonwrite.record.size.estimate -> 160, 
hoodie.insert.shuffle.parallelism -> 1000, path -> 
/projects/cdp/data/cdp_reporting/trr_test2, 
hoodie.datasource.write.precombine.field -> request_id, 
hoodie.datasource.hive_sync.partition_fields -> , 
hoodie.datasource.write.payload.class -> 
com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload, 
hoodie.datasource.hive_sync.partition_extractor_class -> 
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor, 
hoodie.parquet.max.file.size -> 268435456, 
hoodie.datasource.write.streaming.retry.interval.ms -> 2000, 
hoodie.datasource.hive_sync.table -> unknown, 
hoodie.datasource.write.streaming.ignore.failed.batch -> true, 
hoodie.datasource.write.operation -> upsert, hoodie.parquet.compression.codec 
-> snappy, hoodie.datasource.hive_sync.enable -> false, 
hoodie.datasource.write.recordkey.field -> request_id, 
hoodie.datasource.view.type -> read_optimized, hoodie.table.name -> trr2, 
hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://localhost:10000, 
hoodie.datasource.write.table.type -> COPY_ON_WRITE, 
hoodie.memory.merge.max.size -> 2004857600000, 
hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.cleaner.policy -> 
KEEP_LATEST_FILE_VERSIONS, hoodie.datasource.hive_sync.username -> hive, 
hoodie.datasource.write.streaming.retry.count -> 3, 
hoodie.combine.before.upsert -> false, hoodie.datasource.hive_sync.password -> 
hive, hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.ComplexKeyGenerator, hoodie.keep.max.commits -> 3, 
hoodie.upsert.shuffle.parallelism -> 1000, 
hoodie.datasource.hive_sync.assume_date_partitioning -> false, 
hoodie.cleaner.commits.retained -> 1, hoodie.keep.min.commits -> 2, 
hoodie.datasource.write.partitionpath.field -> transaction_month, 
hoodie.datasource.write.commitmeta.key.prefix -> _, 
hoodie.index.bloom.num_entries -> 1500000)

 

 

Code snippet.

 

val responseDF = trrDF.write.format("org.apache.hudi").
 option("hoodie.insert.shuffle.parallelism","1000").
 option("hoodie.upsert.shuffle.parallelism","1000").
 option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
 option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
 option(PRECOMBINE_FIELD_OPT_KEY,"request_id"). 
 option("hoodie.memory.merge.max.size", "2004857600000").
 option(PARTITIONPATH_FIELD_OPT_KEY,"transaction_month").
 option(KEYGENERATOR_CLASS_OPT_KEY,"org.apache.hudi.ComplexKeyGenerator").
 
option(PAYLOAD_CLASS_OPT_KEY,"com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload").
 option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
 option("hoodie.cleaner.commits.retained", 1). 
 option("hoodie.keep.min.commits",2).
 option("hoodie.keep.max.commits",3). 
 option("hoodie.index.bloom.num_entries","1500000").
 option("hoodie.copyonwrite.record.size.estimate","160").
 option("hoodie.parquet.max.file.size",String.valueOf(256*1024*1024)).
 option("hoodie.parquet.small.file.limit",String.valueOf(128*1024*1024)).
 option("hoodie.parquet.compression.codec","snappy").
 *option("hoodie.combine.before.upsert","false").*
 option(RECORDKEY_FIELD_OPT_KEY,"request_id").
 option(TABLE_NAME, "trr2").
 mode(Append).
 save("/projects/cdp/data/cdp_reporting/trr_test2")

 

!image-2020-11-04-23-54-12-187.png!

 

 

tasks corresponding to stage 20:

 

!image-2020-11-04-23-58-00-554.png!

 

Logs from one of the executor.

 

 

20/11/05 06:17:21 INFO TorrentBroadcast: Started reading broadcast variable 20
20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes 
in memory (estimated size 87.2 KB, free 12.1 GB)
20/11/05 06:17:21 INFO TorrentBroadcast: Reading broadcast variable 20 took 4 ms
20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20 stored as values in 
memory (estimated size 239.4 KB, free 12.1 GB)
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 8, fetching them
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef(spark://[email protected]:33406)
20/11/05 06:17:21 INFO MapOutputTrackerWorker: Got the output locations
20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Getting 1000 non-empty 
blocks out of 1000 blocks
20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches 
in 3 ms
20/11/05 06:17:22 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:22 INFO HoodieMergeHandle: MaxMemoryPerPartitionMerge => 
2004857600000
20/11/05 06:17:22 INFO DiskBasedMap: Spilling to file location 
/tmp/19d435cf-e581-4b80-b286-9aa092587c6f in host (10.160.39.146) with hostname 
(sl73caehdn0708.visa.com)
20/11/05 06:17:22 INFO HoodieRecordSizeEstimator: SizeOfRecord => 2552 
SizeOfSchema => 273456
20/11/05 06:17:22 INFO ExternalSpillableMap: Estimated Payload size => 2664
20/11/05 06:17:22 INFO ExternalSpillableMap: New Estimated Payload size => 1688
20/11/05 06:17:37 INFO HoodieMergeHandle: Number of entries in MemoryBasedMap 
=> 1476470Total size in bytes of MemoryBasedMap => 2492281440Number of entries 
in DiskBasedMap => 0Size of file spilled to disk => 0
20/11/05 06:17:37 INFO FileSystemViewManager: Creating View Manager with 
storage type :MEMORY
20/11/05 06:17:37 INFO FileSystemViewManager: Creating in-memory based Table 
View
20/11/05 06:17:37 INFO FileSystemViewManager: Creating InMemory based view for 
basePath /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
from /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:37 INFO HoodieTableConfig: Loading dataset properties from 
/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/hoodie.properties
20/11/05 06:17:37 INFO HoodieTableMetaClient: Finished Loading Table of type 
COPY_ON_WRITE from /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading Active commit timeline 
for /projects/cdp/data/cdp_reporting/trr_test2
20/11/05 06:17:37 INFO HoodieActiveTimeline: Loaded instants 
java.util.stream.ReferencePipeline$Head@7579056
20/11/05 06:17:37 INFO AbstractTableFileSystemView: Building file system view 
for partition (202010)
20/11/05 06:17:37 INFO AbstractTableFileSystemView: #files found in partition 
(202010) =27, Time taken =1
20/11/05 06:17:37 INFO HoodieTableFileSystemView: Adding file-groups for 
partition :202010, #FileGroups=8
20/11/05 06:17:37 INFO AbstractTableFileSystemView: addFilesToView: 
NumFiles=27, FileGroupsCreationTime=2, StoreTimeTaken=0
20/11/05 06:17:37 INFO AbstractTableFileSystemView: Time to load partition 
(202010) =4
20/11/05 06:17:37 INFO HoodieMergeHandle: partitionPath:202010, fileId to be 
merged:3a404978-eaad-4825-b88a-dc24fff0c623-0
20/11/05 06:17:37 INFO HoodieMergeHandle: Merging new data into oldPath 
/projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_3-24-10680_20201105055955.parquet,
 as newPath 
/projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.parquet
20/11/05 06:17:37 INFO HoodieWriteHandle: Creating Marker 
Path=/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/.temp/20201105061418/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.marker
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:38 INFO CodecPool: Got brand-new compressor [.snappy]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
[hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, ugi=svchdc36q 
(auth:SIMPLE)]]]
20/11/05 06:17:42 INFO IteratorBasedQueueProducer: starting to buffer records
20/11/05 06:17:42 INFO BoundedInMemoryExecutor: starting consumer thread
*20/11/05 06:17:42 INFO CodecPool: Got brand-new decompressor [.snappy]*
*20/11/05 06:23:41 INFO IteratorBasedQueueProducer: finished buffering records*
*20/11/05 06:23:41 INFO BoundedInMemoryExecutor: Queue Consumption is done; 
notifying producer threads*
20/11/05 06:23:48 INFO HoodieMergeHandle: MergeHandle for partitionPath 202010 
fileID 3a404978-eaad-4825-b88a-dc24fff0c623-0, took 385963 ms.
20/11/05 06:23:48 INFO MemoryStore: Block rdd_59_4 stored as bytes in memory 
(estimated size 304.0 B, free 12.1 GB)
20/11/05 06:23:48 INFO Executor: Finished task 4.0 in stage 24.0 (TID 10681). 
1010 bytes result sent to driver
20/11/05 06:23:49 INFO CoarseGrainedExecutorBackend: Got assigned task 10686
20/11/05 06:23:49 INFO Executor: Running task 4.0 in stage 30.0 (TID 10686)
20/11/05 06:23:49 INFO TorrentBroadcast: Started reading broadcast variable 21
20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21_piece0 stored as bytes 
in memory (estimated size 87.2 KB, free 12.1 GB)
20/11/05 06:23:49 INFO TorrentBroadcast: Reading broadcast variable 21 took 4 ms
20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21 stored as values in 
memory (estimated size 239.6 KB, free 12.1 GB)
20/11/05 06:23:50 INFO BlockManager: Found block rdd_59_4 locally
20/11/05 06:23:50 INFO Executor: Finished task 4.0 in stage 30.0 (TID 10686). 
1103 bytes result sent to driver
20/11/05 06:23:51 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
20/11/05 06:23:51 INFO MemoryStore: MemoryStore cleared
20/11/05 06:23:51 INFO BlockManager: BlockManager stopped
20/11/05 06:23:51 INFO ShutdownHookManager: Shutdown hook called

 

 


> BoundedInMemoryExecutor takes 6 mins for 1.5 M records per file
> ---------------------------------------------------------------
>
>                 Key: HUDI-1372
>                 URL: https://issues.apache.org/jira/browse/HUDI-1372
>             Project: Apache Hudi
>          Issue Type: Task
>            Reporter: Selvaraj periyasamy
>            Priority: Major
>         Attachments: image-2020-11-04-23-54-12-187.png, 
> image-2020-11-04-23-58-00-554.png, image-2020-11-05-00-00-24-066.png
>
>
> Am using Hudi 0.5.0 .
>  
> trying to write into COW table which includes 350 columns. 
> HoodieSparkSQLWriter takes more than 6 mins to merge.  I dont see anything 
> spilling to disk. Is there any tuning to improve this one? I have disabled 
> option("hoodie.combine.before.upsert","false"). When  this one sets to true, 
> didnot see much difference.
>  
>  
> 20/11/05 07:43:37 INFO DefaultSource: Constructing hoodie (as parquet) data 
> source with options :Map(hoodie.datasource.write.insert.drop.duplicates -> 
> false, hoodie.datasource.hive_sync.database -> default, 
> hoodie.parquet.small.file.limit -> 134217728, 
> hoodie.copyonwrite.record.size.estimate -> 160, 
> hoodie.insert.shuffle.parallelism -> 1000, path -> 
> /projects/cdp/data/cdp_reporting/trr_test2, 
> hoodie.datasource.write.precombine.field -> request_id, 
> hoodie.datasource.hive_sync.partition_fields -> , 
> hoodie.datasource.write.payload.class -> 
> com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload, 
> hoodie.datasource.hive_sync.partition_extractor_class -> 
> org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor, 
> hoodie.parquet.max.file.size -> 268435456, 
> hoodie.datasource.write.streaming.retry.interval.ms -> 2000, 
> hoodie.datasource.hive_sync.table -> unknown, 
> hoodie.datasource.write.streaming.ignore.failed.batch -> true, 
> hoodie.datasource.write.operation -> upsert, hoodie.parquet.compression.codec 
> -> snappy, hoodie.datasource.hive_sync.enable -> false, 
> hoodie.datasource.write.recordkey.field -> request_id, 
> hoodie.datasource.view.type -> read_optimized, hoodie.table.name -> trr2, 
> hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://localhost:10000, 
> hoodie.datasource.write.table.type -> COPY_ON_WRITE, 
> hoodie.memory.merge.max.size -> 2004857600000, 
> hoodie.datasource.write.storage.type -> COPY_ON_WRITE, hoodie.cleaner.policy 
> -> KEEP_LATEST_FILE_VERSIONS, hoodie.datasource.hive_sync.username -> hive, 
> hoodie.datasource.write.streaming.retry.count -> 3, 
> hoodie.combine.before.upsert -> false, hoodie.datasource.hive_sync.password 
> -> hive, hoodie.datasource.write.keygenerator.class -> 
> org.apache.hudi.ComplexKeyGenerator, hoodie.keep.max.commits -> 3, 
> hoodie.upsert.shuffle.parallelism -> 1000, 
> hoodie.datasource.hive_sync.assume_date_partitioning -> false, 
> hoodie.cleaner.commits.retained -> 1, hoodie.keep.min.commits -> 2, 
> hoodie.datasource.write.partitionpath.field -> transaction_month, 
> hoodie.datasource.write.commitmeta.key.prefix -> _, 
> hoodie.index.bloom.num_entries -> 1500000)
>  
>  
> Code snippet.
>  
> val responseDF = trrDF.write.format("org.apache.hudi").
>  option("hoodie.insert.shuffle.parallelism","1000").
>  option("hoodie.upsert.shuffle.parallelism","1000").
>  option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
>  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>  option(PRECOMBINE_FIELD_OPT_KEY,"request_id"). 
>  option("hoodie.memory.merge.max.size", "2004857600000").
>  option(PARTITIONPATH_FIELD_OPT_KEY,"transaction_month").
>  option(KEYGENERATOR_CLASS_OPT_KEY,"org.apache.hudi.ComplexKeyGenerator").
>  
> option(PAYLOAD_CLASS_OPT_KEY,"com.cybs.cdp.reporting.custom.CustomOverWriteWithLatestAvroPayload").
>  option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
>  option("hoodie.cleaner.commits.retained", 1). 
>  option("hoodie.keep.min.commits",2).
>  option("hoodie.keep.max.commits",3). 
>  option("hoodie.index.bloom.num_entries","1500000").
>  option("hoodie.copyonwrite.record.size.estimate","160").
>  option("hoodie.parquet.max.file.size",String.valueOf(256*1024*1024)).
>  option("hoodie.parquet.small.file.limit",String.valueOf(128*1024*1024)).
>  option("hoodie.parquet.compression.codec","snappy").
>  *option("hoodie.combine.before.upsert","false").*
>  option(RECORDKEY_FIELD_OPT_KEY,"request_id").
>  option(TABLE_NAME, "trr2").
>  mode(Append).
>  save("/projects/cdp/data/cdp_reporting/trr_test2")
>  
> !image-2020-11-04-23-54-12-187.png!
>  
>  
> tasks corresponding to stage 20:
>  
> !image-2020-11-04-23-58-00-554.png!
>  
> Logs from one of the executor.
>  
>  
> 20/11/05 06:17:21 INFO TorrentBroadcast: Started reading broadcast variable 20
>  20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20_piece0 stored as 
> bytes in memory (estimated size 87.2 KB, free 12.1 GB)
>  20/11/05 06:17:21 INFO TorrentBroadcast: Reading broadcast variable 20 took 
> 4 ms
>  20/11/05 06:17:21 INFO MemoryStore: Block broadcast_20 stored as values in 
> memory (estimated size 239.4 KB, free 12.1 GB)
>  20/11/05 06:17:21 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 8, fetching them
>  20/11/05 06:17:21 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
> endpoint = 
> NettyRpcEndpointRef(spark://[email protected]:33406)
>  20/11/05 06:17:21 INFO MapOutputTrackerWorker: Got the output locations
>  20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Getting 1000 non-empty 
> blocks out of 1000 blocks
>  20/11/05 06:17:21 INFO ShuffleBlockFetcherIterator: Started 11 remote 
> fetches in 3 ms
>  20/11/05 06:17:22 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:22 INFO HoodieMergeHandle: MaxMemoryPerPartitionMerge => 
> 2004857600000
>  20/11/05 06:17:22 INFO DiskBasedMap: Spilling to file location 
> /tmp/19d435cf-e581-4b80-b286-9aa092587c6f in host (10.160.39.146) with 
> hostname (sl73caehdn0708.visa.com)
>  20/11/05 06:17:22 INFO HoodieRecordSizeEstimator: SizeOfRecord => 2552 
> SizeOfSchema => 273456
>  20/11/05 06:17:22 INFO ExternalSpillableMap: Estimated Payload size => 2664
>  20/11/05 06:17:22 INFO ExternalSpillableMap: New Estimated Payload size => 
> 1688
>  20/11/05 06:17:37 INFO HoodieMergeHandle: Number of entries in 
> MemoryBasedMap => 1476470Total size in bytes of MemoryBasedMap => 
> 2492281440Number of entries in DiskBasedMap => 0Size of file spilled to disk 
> => 0
>  20/11/05 06:17:37 INFO FileSystemViewManager: Creating View Manager with 
> storage type :MEMORY
>  20/11/05 06:17:37 INFO FileSystemViewManager: Creating in-memory based Table 
> View
>  20/11/05 06:17:37 INFO FileSystemViewManager: Creating InMemory based view 
> for basePath /projects/cdp/data/cdp_reporting/trr_test2
>  20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient 
> from /projects/cdp/data/cdp_reporting/trr_test2
>  20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:37 INFO HoodieTableConfig: Loading dataset properties from 
> /projects/cdp/data/cdp_reporting/trr_test2/.hoodie/hoodie.properties
>  20/11/05 06:17:37 INFO HoodieTableMetaClient: Finished Loading Table of type 
> COPY_ON_WRITE from /projects/cdp/data/cdp_reporting/trr_test2
>  20/11/05 06:17:37 INFO HoodieTableMetaClient: Loading Active commit timeline 
> for /projects/cdp/data/cdp_reporting/trr_test2
>  20/11/05 06:17:37 INFO HoodieActiveTimeline: Loaded instants 
> java.util.stream.ReferencePipeline$Head@7579056
>  20/11/05 06:17:37 INFO AbstractTableFileSystemView: Building file system 
> view for partition (202010)
>  20/11/05 06:17:37 INFO AbstractTableFileSystemView: #files found in 
> partition (202010) =27, Time taken =1
>  20/11/05 06:17:37 INFO HoodieTableFileSystemView: Adding file-groups for 
> partition :202010, #FileGroups=8
>  20/11/05 06:17:37 INFO AbstractTableFileSystemView: addFilesToView: 
> NumFiles=27, FileGroupsCreationTime=2, StoreTimeTaken=0
>  20/11/05 06:17:37 INFO AbstractTableFileSystemView: Time to load partition 
> (202010) =4
>  20/11/05 06:17:37 INFO HoodieMergeHandle: partitionPath:202010, fileId to be 
> merged:3a404978-eaad-4825-b88a-dc24fff0c623-0
>  20/11/05 06:17:37 INFO HoodieMergeHandle: Merging new data into oldPath 
> /projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_3-24-10680_20201105055955.parquet,
>  as newPath 
> /projects/cdp/data/cdp_reporting/trr_test2/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.parquet
>  20/11/05 06:17:37 INFO HoodieWriteHandle: Creating Marker 
> Path=/projects/cdp/data/cdp_reporting/trr_test2/.hoodie/.temp/20201105061418/202010/3a404978-eaad-4825-b88a-dc24fff0c623-0_4-24-10681_20201105061418.marker
>  20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:37 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:38 INFO CodecPool: Got brand-new compressor [.snappy]
>  20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:38 INFO FSUtils: Hadoop Configuration: fs.defaultFS: 
> [hdfs://oprhqanameservice], Config:[Configuration: ], FileSystem: 
> [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1395542441_138, 
> ugi=svchdc36q (auth:SIMPLE)]]]
>  20/11/05 06:17:42 INFO IteratorBasedQueueProducer: starting to buffer records
>  20/11/05 06:17:42 INFO BoundedInMemoryExecutor: starting consumer thread
>  *20/11/05 06:17:42 INFO CodecPool: Got brand-new decompressor [.snappy]*
>  *20/11/05 06:23:41 INFO IteratorBasedQueueProducer: finished buffering 
> records*
>  *20/11/05 06:23:41 INFO BoundedInMemoryExecutor: Queue Consumption is done; 
> notifying producer threads*
>  20/11/05 06:23:48 INFO HoodieMergeHandle: MergeHandle for partitionPath 
> 202010 fileID 3a404978-eaad-4825-b88a-dc24fff0c623-0, took 385963 ms.
>  20/11/05 06:23:48 INFO MemoryStore: Block rdd_59_4 stored as bytes in memory 
> (estimated size 304.0 B, free 12.1 GB)
>  20/11/05 06:23:48 INFO Executor: Finished task 4.0 in stage 24.0 (TID 
> 10681). 1010 bytes result sent to driver
>  20/11/05 06:23:49 INFO CoarseGrainedExecutorBackend: Got assigned task 10686
>  20/11/05 06:23:49 INFO Executor: Running task 4.0 in stage 30.0 (TID 10686)
>  20/11/05 06:23:49 INFO TorrentBroadcast: Started reading broadcast variable 
> 21
>  20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21_piece0 stored as 
> bytes in memory (estimated size 87.2 KB, free 12.1 GB)
>  20/11/05 06:23:49 INFO TorrentBroadcast: Reading broadcast variable 21 took 
> 4 ms
>  20/11/05 06:23:49 INFO MemoryStore: Block broadcast_21 stored as values in 
> memory (estimated size 239.6 KB, free 12.1 GB)
>  20/11/05 06:23:50 INFO BlockManager: Found block rdd_59_4 locally
>  20/11/05 06:23:50 INFO Executor: Finished task 4.0 in stage 30.0 (TID 
> 10686). 1103 bytes result sent to driver
>  20/11/05 06:23:51 INFO CoarseGrainedExecutorBackend: Driver commanded a 
> shutdown
>  20/11/05 06:23:51 INFO MemoryStore: MemoryStore cleared
>  20/11/05 06:23:51 INFO BlockManager: BlockManager stopped
>  20/11/05 06:23:51 INFO ShutdownHookManager: Shutdown hook called
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to