hudi-agent commented on code in PR #19033:
URL: https://github.com/apache/hudi/pull/19033#discussion_r3426437347
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -597,6 +642,17 @@ public void deleteLastReplicatedTimeStamp(String
tableName) {
public void close() {
try {
ddlExecutor.close();
Review Comment:
🤖 If `ddlExecutor.close()` throws, control jumps to the outer `catch
(Exception e) { log.error(...) }` (which doesn't rethrow), so
`partitionClientPool.close()` never runs and the Thrift sockets + worker
threads leak silently. Could you wrap the pool cleanup in a try-finally so it
runs regardless of whether `ddlExecutor.close()` succeeds? E.g. `try {
ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -86,6 +90,16 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
private final Map<String, Table> initialTableByName = new HashMap<>();
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
+ // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HMS or
HIVEQL.
+ // Owned by this class; closed in close() before Hive.closeCurrent(). Hands
clients
+ // to the active DDL executor for partition-row work only — see
IMetaStoreClientPool
+ // javadoc. HMSDDLExecutor uses it for add/alter/drop; HiveQueryDDLExecutor
uses it
+ // only for DROP (Hive Thrift, not Hive Driver).
+ private IMetaStoreClientPool partitionClientPool;
+ // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL
(explicit
+ // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for
+ // reference only — close() is delegated through ddlExecutor.close().
+ private Option<HiveDriverPool> partitionDriverPool = Option.empty();
Review Comment:
🤖 nit: the comment says this field is 'kept for reference only — close() is
delegated through ddlExecutor.close()', which means it's never read after being
passed into `HiveQueryDDLExecutor`. Could you make it a local variable in the
constructor instead? A field that isn't read after construction will make
future readers wonder what lifecycle role it plays.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java:
##########
@@ -291,21 +312,71 @@ public void close() {
private void registerAlterPartitionEvent(String tableName, List<String>
alteredPartitions) {
try {
+ // Read the StorageDescriptor once on the session client; each worker
deep-copies
+ // it per partition (alter_partitions semantics today).
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
- List<Partition> partitionList = alteredPartitions.stream().map(partition
-> {
- Path partitionPath =
HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH),
partition);
- String partitionScheme = partitionPath.toUri().getScheme();
- String fullPartitionPath =
StorageSchemes.HDFS.getScheme().equals(partitionScheme)
- ?
HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(),
partitionPath) : partitionPath.toString();
- List<String> partitionValues =
partitionValueExtractor.extractPartitionValuesInPath(partition);
- StorageDescriptor partitionSd = sd.deepCopy();
- partitionSd.setLocation(fullPartitionPath);
- return new Partition(partitionValues, databaseName, tableName, 0, 0,
partitionSd, null);
- }).collect(Collectors.toList());
- client.alter_partitions(databaseName, tableName, partitionList, null);
- } catch (TException e) {
+ int batchSyncPartitionNum =
syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
+ List<List<String>> batches = CollectionUtils.batches(alteredPartitions,
batchSyncPartitionNum);
+ runBatches("alter", tableName, batches, (poolClient, batch) -> {
+ List<Partition> partitionList = batch.stream().map(partition -> {
+ Path partitionPath =
HadoopFSUtils.constructAbsolutePathInHadoopPath(syncConfig.getString(META_SYNC_BASE_PATH),
partition);
+ String partitionScheme = partitionPath.toUri().getScheme();
+ String fullPartitionPath =
StorageSchemes.HDFS.getScheme().equals(partitionScheme)
+ ?
HadoopFSUtils.getDFSFullPartitionPath(syncConfig.getHadoopFileSystem(),
partitionPath) : partitionPath.toString();
+ List<String> partitionValues =
partitionValueExtractor.extractPartitionValuesInPath(partition);
+ StorageDescriptor partitionSd = sd.deepCopy();
+ partitionSd.setLocation(fullPartitionPath);
+ return new Partition(partitionValues, databaseName, tableName, 0, 0,
partitionSd, null);
+ }).collect(Collectors.toList());
+ poolClient.alter_partitions(databaseName, tableName, partitionList,
null);
+ });
+ } catch (Exception e) {
log.error("{}.{} update partition failed", databaseName, tableName, e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + "
update partition failed", e);
}
}
+
+ /**
+ * Dispatches partition batches either in parallel against the client pool
(if
+ * configured) or sequentially against the session client. The sequential
path
+ * preserves the exact behavior we had before the pool existed, including
failure
+ * semantics where batch N+1 never runs if batch N throws.
+ */
+ @FunctionalInterface
+ private interface BatchAction {
+ void apply(IMetaStoreClient client, List<String> batch) throws Exception;
+ }
+
+ private void runBatches(String opName, String tableName, List<List<String>>
batches, BatchAction action) throws Exception {
+ if (partitionClientPool == null) {
+ for (List<String> batch : batches) {
+ action.apply(client, batch);
+ }
+ return;
+ }
+ List<Future<Void>> futures = new ArrayList<>(batches.size());
+ for (List<String> batch : batches) {
+ futures.add(partitionClientPool.executor().submit(() ->
+ partitionClientPool.run(poolClient -> {
+ action.apply(poolClient, batch);
Review Comment:
🤖 When a Future fails, `f.get()` throws `ExecutionException` wrapping the
real cause. `firstError` then stores the `ExecutionException`, which gets
rethrown and caught by the outer `catch (Exception e)` in
`addPartitionsToTable`/`dropPartitionsToTable`/`registerAlterPartitionEvent`,
which wraps it again in `HoodieHiveSyncException`. The result is
`HoodieHiveSyncException -> ExecutionException -> RealCause` — one more
wrapping layer than the sequential path produced. HiveDriverPool.awaitAll
already has an `unwrap()` helper; could we do the same here for consistency?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.util;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+
+/**
+ * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel
HiveQL DDL.
+ *
+ * <p>Hive's {@code SessionState.start(state)} binds state to the calling
thread's
+ * thread-local, and {@code Driver} reads from that thread-local during {@code
run()}.
+ * A Driver constructed on one thread cannot be safely used from another. This
pool
+ * solves that by giving each slot its own dedicated worker thread (a
single-thread
+ * executor) — the Driver and SessionState are built on that thread by a
bootstrap
+ * task, and all subsequent SQL for that slot runs on the same thread.
+ *
+ * <p><b>Usage contract:</b> use this pool only for partition-row DDL
statements that
+ * are independent of each other and freely shuffleable across workers.
Table-level
+ * statements (createTable, schema evolution, USE database) must continue to
run on
+ * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync
driver
+ * thread. The pool is gated behind {@code
hoodie.datasource.hive_sync.batching.enabled}
+ * and is constructed only for HiveQL sync mode.
+ */
+public class HiveDriverPool implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveDriverPool.class);
+
+ // Per-worker Driver construction has to be fast in practice (a few hundred
ms
+ // for the SessionState + Driver init). A 60s ceiling per worker leaves
plenty of
+ // headroom for a slow JVM warm-up but bounds the failure mode if the
metastore
+ // is unreachable or Hive hangs during init.
+ private static final long BOOTSTRAP_TIMEOUT_SECONDS = 60;
+
+ private final List<Worker> workers;
+ private final int size;
+ private volatile boolean closed;
+
+ public HiveDriverPool(HiveSyncConfig config, int size) {
+ this(config, size, new DefaultDriverFactory(config));
+ }
+
+ // Package-private for tests: accepts a DriverFactory so unit tests can
inject
+ // mock Driver instances without standing up a real Hive instance.
+ HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) {
+ if (size < 1) {
+ throw new IllegalArgumentException("Pool size must be >= 1, got " +
size);
+ }
+ this.size = size;
+ this.workers = new ArrayList<>(size);
+ String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
+ PoolThreadFactory threadFactory = new PoolThreadFactory();
+ List<Future<Void>> bootstrapFutures = new ArrayList<>(size);
+ try {
+ for (int i = 0; i < size; i++) {
+ Worker worker = new Worker(threadFactory);
+ workers.add(worker);
+ bootstrapFutures.add(worker.executor.submit(() -> {
+ worker.driver = factory.newDriver(databaseName);
+ return null;
+ }));
+ }
+ // Block until all bootstraps complete so we surface construction errors
+ // before any caller hands us SQL. Bounded by BOOTSTRAP_TIMEOUT_SECONDS
so a
+ // hung Hive init doesn't deadlock the sync driver thread.
+ for (Future<Void> f : bootstrapFutures) {
+ f.get(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ tearDown();
+ throw new HoodieException("Failed to construct HiveDriverPool of size "
+ size, e);
+ }
+ LOG.info("Initialized HiveDriverPool with {} workers", size);
+ }
+
+ /**
+ * Runs each given SQL on <i>every</i> worker, in order. Used for setup
statements
+ * (e.g. {@code USE database}) that must establish per-thread session context
+ * before any partition statement runs. Blocks until all workers have
completed
+ * the setup. Throws on first error.
+ */
+ public void runOnEachWorker(List<String> setupSqls) {
+ if (closed) {
+ throw new IllegalStateException("Cannot dispatch to a closed
HiveDriverPool");
+ }
+ if (setupSqls.isEmpty()) {
+ return;
+ }
+ List<Future<?>> futures = new ArrayList<>(workers.size());
+ for (Worker worker : workers) {
+ futures.add(worker.executor.submit(() -> {
+ for (String sql : setupSqls) {
+ worker.driver.run(sql);
+ }
+ return null;
+ }));
+ }
+ awaitAll(futures);
+ }
+
+ /**
+ * Dispatches each SQL string to a worker (round-robin) and returns the list
of
+ * futures. The caller is responsible for awaiting and collecting errors.
SQL text
+ * is intentionally not logged per-statement here: batched TOUCH/ADD
statements can
+ * be many kilobytes, and N parallel workers would multiply the log volume.
See
+ * {@link #awaitAll(List)} for the per-call summary log.
+ */
+ public List<Future<?>> runAll(List<String> sqls) {
+ if (closed) {
+ throw new IllegalStateException("Cannot dispatch to a closed
HiveDriverPool");
+ }
+ List<Future<?>> futures = new ArrayList<>(sqls.size());
+ for (int i = 0; i < sqls.size(); i++) {
+ String sql = sqls.get(i);
+ Worker worker = workers.get(i % workers.size());
+ futures.add(worker.executor.submit(() -> {
+ worker.driver.run(sql);
+ return null;
+ }));
+ }
+ return futures;
+ }
+
+ /**
+ * Awaits all futures and throws the first exception encountered. On first
failure,
+ * cancels the remaining (not yet started) futures so workers don't keep
running
+ * pointless work after a fatal error. Any errors that finished before
cancellation
+ * are logged at WARN. Callers do not need per-statement results (Hive's
Driver.run
+ * side-effects the metastore), so this method is void.
+ */
+ public void awaitAll(List<Future<?>> futures) {
+ long start = System.currentTimeMillis();
+ Exception firstError = null;
+ int completed = 0;
+ int cancelled = 0;
+ for (int i = 0; i < futures.size(); i++) {
+ Future<?> f = futures.get(i);
+ try {
+ f.get();
+ completed++;
+ } catch (CancellationException ce) {
+ // We cancelled this future ourselves after a prior error. Don't treat
it
+ // as a new failure; just note it for the summary log.
+ cancelled++;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ if (firstError == null) {
+ firstError = ie;
+ cancelled += cancelRemaining(futures, i + 1);
+ }
+ } catch (ExecutionException ee) {
+ Exception cause = unwrap(ee);
+ if (firstError == null) {
+ firstError = cause;
+ cancelled += cancelRemaining(futures, i + 1);
+ } else {
+ LOG.warn("Additional SQL batch failed (suppressed in favor of first
error)", cause);
+ }
+ }
+ }
+ if (firstError != null) {
+ throw new HoodieHiveSyncException("Failed in executing SQL", firstError);
+ }
+ LOG.info("Completed {} SQL statements ({} cancelled) in {} ms across {}
workers",
+ completed, cancelled, System.currentTimeMillis() - start, size);
+ }
+
+ private static int cancelRemaining(List<Future<?>> futures, int fromIndex) {
+ int cancelled = 0;
+ for (int j = fromIndex; j < futures.size(); j++) {
+ // mayInterruptIfRunning=false: the worker thread is bound to a Hive
Driver
+ // whose state we don't want to corrupt mid-statement. Cancel only those
that
+ // haven't started yet; in-flight statements run to completion.
+ if (futures.get(j).cancel(false)) {
+ cancelled++;
+ }
+ }
+ return cancelled;
+ }
+
+ private static Exception unwrap(ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ return (cause instanceof Exception) ? (Exception) cause : ee;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ tearDown();
+ }
+
+ private void tearDown() {
+ // Close each worker's Driver/SessionState on its own thread, then shut the
+ // executor down. Running close() on the bound thread keeps SessionState's
+ // thread-local cleanup correct.
+ for (Worker worker : workers) {
+ try {
+ worker.executor.submit(() -> {
+ if (worker.driver != null) {
+ try {
+ worker.driver.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing pooled Driver", e);
+ }
+ }
+ SessionState ss = SessionState.get();
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing pooled SessionState", e);
+ }
+ }
+ return null;
+ }).get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("Error during pool worker shutdown", e);
+ }
+ worker.executor.shutdown();
+ try {
+ if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ worker.executor.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ worker.executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ workers.clear();
+ }
+
+ /**
+ * Per-slot state: a single-thread executor and the Driver bound to its
thread.
+ * Driver is volatile because it is written by the bootstrap task and read by
+ * subsequent dispatch tasks on the same executor.
+ */
+ private static final class Worker {
+ final ExecutorService executor;
+ volatile Driver driver;
+
+ Worker(ThreadFactory threadFactory) {
+ this.executor = Executors.newSingleThreadExecutor(threadFactory);
+ }
+ }
+
+ @FunctionalInterface
+ interface DriverFactory {
+ Driver newDriver(String databaseName) throws Exception;
+ }
+
+ /**
+ * Builds a real Hive {@link Driver} on the calling thread. The SessionState
is
+ * constructed lazily (once, on the first worker thread that builds a
Driver) and
+ * shared across all worker threads — Hive uses ThreadLocal attachment, not
+ * exclusive ownership, so multiple workers calling
+ * {@code SessionState.start(sharedState)} all see the same config and
scratch dir
+ * without each spending the cost of building their own SessionState (and
risking
+ * resource-dir creation races during the constructor).
+ */
+ private static final class DefaultDriverFactory implements DriverFactory {
+ private final HiveConf hiveConf;
+ private volatile SessionState sharedSessionState;
+
+ DefaultDriverFactory(HiveSyncConfig config) {
+ this.hiveConf = config.getHiveConf();
+ }
+
+ @Override
+ public synchronized Driver newDriver(String databaseName) throws Exception
{
+ // SessionState is shared across workers; build it once on the first
call (with
+ // currentDatabase already set) and attach it to each worker's
thread-local on
+ // subsequent calls. The database is a pool-wide property and never
changes
+ // across workers, so setting it once at construction time is sufficient.
Review Comment:
🤖 The shared SessionState pattern here makes me a bit nervous — every worker
thread calls `SessionState.start(sharedSessionState)` and then runs concurrent
`Driver.run()` against the same instance. Hive's SessionState has plenty of
mutable fields the Driver writes during query execution (e.g. `currentDatabase`
via USE, `hiveOperation`, `cmd`, `lineageState`, `tablesAlreadyMarkedAsRead`).
Are we confident none of these get written concurrently by parallel ALTER
PARTITION / TOUCH statements? @yihua could you weigh in on whether sharing
SessionState across worker threads is safe in practice, or whether each worker
should construct its own (paying the SessionState init cost in exchange for
isolation)?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java:
##########
@@ -132,23 +196,81 @@ public void dropPartitionsToTable(String tableName,
List<String> partitionsToDro
log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
- for (String dropPartition : partitionsToDrop) {
- if (HivePartitionUtil.partitionExists(metaStoreClient, tableName,
dropPartition, partitionValueExtractor,
- config)) {
- String partitionClause =
- HivePartitionUtil.getPartitionClauseForDrop(dropPartition,
partitionValueExtractor, config);
- metaStoreClient.dropPartition(databaseName, tableName,
partitionClause, false);
- }
- log.info("Drop partition {} on {}", dropPartition, tableName);
- }
+ int batchSyncPartitionNum =
config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
+ List<List<String>> batches = CollectionUtils.batches(partitionsToDrop,
batchSyncPartitionNum);
+ runDropBatches(tableName, batches);
} catch (Exception e) {
log.error("{} drop partition failed", tableId(databaseName, tableName),
e);
throw new HoodieHiveSyncException(tableId(databaseName, tableName) + "
drop partition failed", e);
}
}
+ /**
+ * Drops partitions one batch at a time. When {@link #metaStoreClientPool}
is present,
+ * batches fan out across the pool's worker threads (each borrowing an
independent
+ * IMetaStoreClient); otherwise batches are dispatched sequentially against
the
+ * session client. Hive has no batch-drop primitive that matches
dropPartition's
+ * semantics, so each worker still iterates its chunk one partition at a
time — the
+ * win is fanning chunks across independent Thrift clients.
+ */
+ private void runDropBatches(String tableName, List<List<String>> batches)
throws Exception {
Review Comment:
🤖 nit: the parallel-path error-collection loop inside `runDropBatches`
(`Exception firstError = null; for (Future<Void> f : futures) …`) is nearly
identical to the one in `HMSDDLExecutor.runBatches`. Since `HiveDriverPool`
already has a well-factored `awaitAll(List<Future<?>>)` for exactly this
pattern, it might be worth adding a similar `awaitAll(List<Future<Void>>)` to
`IMetaStoreClientPool` so both callers can delegate to it instead.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]