prashantwason opened a new issue, #18331:
URL: https://github.com/apache/hudi/issues/18331

   ## Problem
   
   Hive Sync partition operations (TOUCH, UPDATE, DROP) are significantly 
slower than necessary for tables with large partition counts (~1,500+ 
partitions per sync). Benchmarking against an equivalent system (HDrone) shows 
a **4.3x–9.2x slowdown** on incremental syncs of the same tables under 
identical conditions.
   
   ### Benchmark Results
   
   | Scenario | Partitions | Sequential (Hudi) | Parallel (HDrone) | Slowdown |
   |----------|-----------|-------------------|-------------------|----------|
   | Large table incremental | ~1,970 | 1,186s (19.8 min) | 208s (3.5 min) | 
**5.7x** |
   | Large table incremental | ~1,516 | 1,637s (27.3 min) | 178s (3.0 min) | 
**9.2x** |
   | Large table incremental | ~2,166 | 892s (14.9 min) | 206s (3.4 min) | 
**4.3x** |
   | Small table (~5 partitions) | ~5 | ~4s | ~3s | ~1x |
   
   For small partition counts the difference is negligible, but it becomes 
severe at scale.
   
   ## Root Causes
   
   ### 1. No batching for TOUCH/UPDATE/DROP partition operations
   
   The `hoodie.datasource.hive_sync.batch_num` config only applies to **ADD** 
partition operations. All other operations ignore it:
   
   - **TOUCH**: `constructTouchPartitions()` in `QueryBasedDDLExecutor` 
concatenates ALL partitions into a **single** `ALTER TABLE ... TOUCH 
PARTITION(...) PARTITION(...) ...` statement. For 2,000 partitions, this 
produces one massive SQL statement.
   - **UPDATE (SET LOCATION)**: `constructChangePartitions()` produces one SQL 
statement per partition but doesn't batch them.
   - **HMS mode**: `registerAlterPartitionEvent()` in `HMSDDLExecutor` sends 
ALL partitions in a single `client.alter_partitions()` Thrift call.
   
   **Relevant code** (`QueryBasedDDLExecutor.java`):
   ```java
   // TOUCH — no batching, single statement with ALL partitions
   private List<String> constructTouchPartitions(String tableName, List<String> 
partitions) {
       String alterTable = "ALTER TABLE " + tableName + " TOUCH";
       for (String partition : partitions) {
           alterTable += " PARTITION (" + getPartitionClause(partition) + ")";
       }
       // Returns ONE statement containing all partitions
   }
   
   // Compare with ADD — properly batched
   private List<String> constructAddPartitions(String tableName, List<String> 
partitions) {
       int batchSyncPartitionNum = 
config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
       for (int i = 0; i < partitions.size(); i++) {
           // ... append partition ...
           if ((i + 1) % batchSyncPartitionNum == 0) {
               result.add(alterSQL.toString());  // flush batch
               alterSQL = getAlterTablePrefix(tableName);  // start new
           }
       }
   }
   ```
   
   ### 2. No parallel execution of DDL statements
   
   Even when ADD partitions are batched into multiple SQL statements, they are 
executed **sequentially on a single thread**:
   
   ```java
   // HiveQueryDDLExecutor — sequential loop
   private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
       for (String sql : sqls) {
           hiveDriver.run(sql);  // blocks until complete, then next
       }
   }
   
   // JDBCExecutor — sequential loop
   public void runSQL(List<String> sqls) {
       sqls.forEach(this::runSQL);  // one at a time
   }
   ```
   
   The bottleneck is I/O-bound (network round-trips to the metastore RDBMS), 
not CPU-bound, so parallelism would significantly reduce wall-clock time.
   
   ## Proposed Fix
   
   ### 1. Add batching to TOUCH/UPDATE/DROP operations
   
   Extend `batch_num` to apply to all partition operations, not just ADD. For 
TOUCH, this means splitting one giant `ALTER TABLE TOUCH` into multiple smaller 
statements (e.g., 1,000 partitions each).
   
   ### 2. Add parallel DDL execution
   
   Execute batched DDL statements concurrently using a thread pool (default 8 
threads), similar to how HDrone processes partition operations.
   
   For thread safety:
   - **HiveQueryDDLExecutor**: Each worker thread creates its own 
`SessionState` + `Driver` (they are not thread-safe).
   - **JDBCExecutor**: Each worker thread creates its own JDBC `Connection`.
   - **HMSDDLExecutor**: `IMetaStoreClient` is a single Thrift connection and 
not thread-safe, so batching is applied sequentially (still helps avoid 
oversized Thrift requests and timeouts).
   
   ### 3. Feature flag for safe rollout
   
   Gate behind a new config `hoodie.datasource.hive_sync.batching.enabled` 
(default `false`) so existing behavior is completely unchanged unless 
explicitly opted in.
   
   ### New configs
   
   | Config | Default | Purpose |
   |--------|---------|---------|
   | `hoodie.datasource.hive_sync.batching.enabled` | `false` | Feature flag to 
enable batched + parallel partition sync |
   | `hoodie.datasource.hive_sync.batching.threads` | `8` | Thread pool size 
for parallel DDL execution |
   
   ### Files to modify
   
   - `HiveSyncConfigHolder.java` — new configs
   - `QueryBasedDDLExecutor.java` — batch TOUCH/UPDATE, add parallel execution 
routing
   - `HiveQueryDDLExecutor.java` — parallel execution with per-thread Hive 
sessions
   - `JDBCExecutor.java` — parallel execution with per-thread JDBC connections
   - `HMSDDLExecutor.java` — sequential batching for alter_partitions
   
   ### Expected improvement
   
   Based on benchmarks, enabling batching + 8-thread parallelism should bring 
Hudi's partition sync performance in line with HDrone (~0.10 s/partition vs 
current ~0.60–1.08 s/partition), reducing sync time for 2,000-partition tables 
from **~20 minutes to ~3 minutes**.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to