prashantwason opened a new issue, #18331:
URL: https://github.com/apache/hudi/issues/18331
## Problem
Hive Sync partition operations (TOUCH, UPDATE, DROP) are significantly
slower than necessary for tables with large partition counts (~1,500+
partitions per sync). Benchmarking against an equivalent system (HDrone) shows
a **4.3x–9.2x slowdown** on incremental syncs of the same tables under
identical conditions.
### Benchmark Results
| Scenario | Partitions | Sequential (Hudi) | Parallel (HDrone) | Slowdown |
|----------|-----------|-------------------|-------------------|----------|
| Large table incremental | ~1,970 | 1,186s (19.8 min) | 208s (3.5 min) |
**5.7x** |
| Large table incremental | ~1,516 | 1,637s (27.3 min) | 178s (3.0 min) |
**9.2x** |
| Large table incremental | ~2,166 | 892s (14.9 min) | 206s (3.4 min) |
**4.3x** |
| Small table (~5 partitions) | ~5 | ~4s | ~3s | ~1x |
For small partition counts the difference is negligible, but it becomes
severe at scale.
## Root Causes
### 1. No batching for TOUCH/UPDATE/DROP partition operations
The `hoodie.datasource.hive_sync.batch_num` config only applies to **ADD**
partition operations. All other operations ignore it:
- **TOUCH**: `constructTouchPartitions()` in `QueryBasedDDLExecutor`
concatenates ALL partitions into a **single** `ALTER TABLE ... TOUCH
PARTITION(...) PARTITION(...) ...` statement. For 2,000 partitions, this
produces one massive SQL statement.
- **UPDATE (SET LOCATION)**: `constructChangePartitions()` produces one SQL
statement per partition but doesn't batch them.
- **HMS mode**: `registerAlterPartitionEvent()` in `HMSDDLExecutor` sends
ALL partitions in a single `client.alter_partitions()` Thrift call.
**Relevant code** (`QueryBasedDDLExecutor.java`):
```java
// TOUCH — no batching, single statement with ALL partitions
private List<String> constructTouchPartitions(String tableName, List<String>
partitions) {
String alterTable = "ALTER TABLE " + tableName + " TOUCH";
for (String partition : partitions) {
alterTable += " PARTITION (" + getPartitionClause(partition) + ")";
}
// Returns ONE statement containing all partitions
}
// Compare with ADD — properly batched
private List<String> constructAddPartitions(String tableName, List<String>
partitions) {
int batchSyncPartitionNum =
config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
for (int i = 0; i < partitions.size(); i++) {
// ... append partition ...
if ((i + 1) % batchSyncPartitionNum == 0) {
result.add(alterSQL.toString()); // flush batch
alterSQL = getAlterTablePrefix(tableName); // start new
}
}
}
```
### 2. No parallel execution of DDL statements
Even when ADD partitions are batched into multiple SQL statements, they are
executed **sequentially on a single thread**:
```java
// HiveQueryDDLExecutor — sequential loop
private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
for (String sql : sqls) {
hiveDriver.run(sql); // blocks until complete, then next
}
}
// JDBCExecutor — sequential loop
public void runSQL(List<String> sqls) {
sqls.forEach(this::runSQL); // one at a time
}
```
The bottleneck is I/O-bound (network round-trips to the metastore RDBMS),
not CPU-bound, so parallelism would significantly reduce wall-clock time.
## Proposed Fix
### 1. Add batching to TOUCH/UPDATE/DROP operations
Extend `batch_num` to apply to all partition operations, not just ADD. For
TOUCH, this means splitting one giant `ALTER TABLE TOUCH` into multiple smaller
statements (e.g., 1,000 partitions each).
### 2. Add parallel DDL execution
Execute batched DDL statements concurrently using a thread pool (default 8
threads), similar to how HDrone processes partition operations.
For thread safety:
- **HiveQueryDDLExecutor**: Each worker thread creates its own
`SessionState` + `Driver` (they are not thread-safe).
- **JDBCExecutor**: Each worker thread creates its own JDBC `Connection`.
- **HMSDDLExecutor**: `IMetaStoreClient` is a single Thrift connection and
not thread-safe, so batching is applied sequentially (still helps avoid
oversized Thrift requests and timeouts).
### 3. Feature flag for safe rollout
Gate behind a new config `hoodie.datasource.hive_sync.batching.enabled`
(default `false`) so existing behavior is completely unchanged unless
explicitly opted in.
### New configs
| Config | Default | Purpose |
|--------|---------|---------|
| `hoodie.datasource.hive_sync.batching.enabled` | `false` | Feature flag to
enable batched + parallel partition sync |
| `hoodie.datasource.hive_sync.batching.threads` | `8` | Thread pool size
for parallel DDL execution |
### Files to modify
- `HiveSyncConfigHolder.java` — new configs
- `QueryBasedDDLExecutor.java` — batch TOUCH/UPDATE, add parallel execution
routing
- `HiveQueryDDLExecutor.java` — parallel execution with per-thread Hive
sessions
- `JDBCExecutor.java` — parallel execution with per-thread JDBC connections
- `HMSDDLExecutor.java` — sequential batching for alter_partitions
### Expected improvement
Based on benchmarks, enabling batching + 8-thread parallelism should bring
Hudi's partition sync performance in line with HDrone (~0.10 s/partition vs
current ~0.60–1.08 s/partition), reducing sync time for 2,000-partition tables
from **~20 minutes to ~3 minutes**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]