This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d32bfce601 [Fix][HiveSink] Implement overwrite semantics for streaming
commits to prevent multiple deletions of target directories (#10279)
d32bfce601 is described below
commit d32bfce601ede01a3ff3758d6c1e1fd83f95dd8a
Author: Adam Wang <[email protected]>
AuthorDate: Thu Jan 8 19:12:19 2026 +0800
[Fix][HiveSink] Implement overwrite semantics for streaming commits to
prevent multiple deletions of target directories (#10279)
Co-authored-by: wangxiaogang <[email protected]>
---
docs/en/connector-v2/sink/Hive.md | 9 +-
docs/zh/connector-v2/sink/Hive.md | 11 +-
.../hive/commit/HiveSinkAggregatedCommitter.java | 169 ++++++++++-
...kAggregatedCommitterOverwriteStreamingTest.java | 331 +++++++++++++++++++++
4 files changed, 505 insertions(+), 15 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 3b629e6154..4c8625e69b 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -102,6 +102,11 @@ Support writing Parquet INT96 from a timestamp, only valid
for parquet files.
### overwrite [boolean]
+Flag to decide whether to use overwrite mode when inserting data into Hive. If
set to true, for non-partitioned tables, the existing data in the table will be
deleted before inserting new data. For partitioned tables, the data in the
relevant partition will be deleted before inserting new data.
+
+- Batch mode (BATCH): Delete existing data in the target path before commit
(for non-partitioned tables, delete the table directory; for partitioned
tables, delete the related partition directories), then write new data.
+- Streaming mode (STREAMING): In streaming jobs with checkpointing enabled,
`commit()` is invoked after each completed checkpoint. To avoid deleting on
every checkpoint (which would wipe previously committed files), SeaTunnel
deletes each target directory (table directory / partition directory) at most
once (empty commits will skip deletion). On recovery, the delete step is
best-effort and may be skipped to avoid deleting already committed data, so
streaming overwrite is not a strict sna [...]
+
### data_save_mode [enum]
Select how to handle existing data on the target before writing new data.
@@ -112,8 +117,6 @@ Select how to handle existing data on the target before
writing new data.
Note: overwrite=true and data_save_mode=DROP_DATA are equivalent. Use either
one; do not set both.
-Flag to decide whether to use overwrite mode when inserting data into Hive. If
set to true, for non-partitioned tables, the existing data in the table will be
deleted before inserting new data. For partitioned tables, the data in the
relevant partition will be deleted before inserting new data.
-
### schema_save_mode [enum]
Before starting the synchronization task, different processing schemes are
selected for the existing table structure on the target side.
@@ -574,4 +577,4 @@ sink {
```
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Hive.md
b/docs/zh/connector-v2/sink/Hive.md
index af3ad07780..1ba385ba53 100644
--- a/docs/zh/connector-v2/sink/Hive.md
+++ b/docs/zh/connector-v2/sink/Hive.md
@@ -100,7 +100,12 @@ Kerberos 的 keytab 文件路径
支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。
-### schema_save_mode [枚举]
+### overwrite [boolean]
+
+是否以覆盖写入(Overwrite)方式写入 Hive。
+
+- 批模式(BATCH):在提交前删除目标路径中已有数据(非分区表删除表目录;分区表删除本次提交涉及的分区目录),再写入新数据。
+- 流模式(STREAMING):在启用 checkpoint 的流式运行时,commit 会在每个 checkpoint 完成后触发一次。为避免每个
checkpoint 都重复删除导致数据丢失,SeaTunnel
会对每个目标目录(表目录/分区目录)最多删除一次(空提交会跳过删除)。恢复(recovery)场景下为避免误删已提交数据,删除行为为
best-effort,可能会被跳过,因此不保证严格的“全量覆盖”语义。
### data_save_mode [enum]
@@ -112,6 +117,8 @@ Kerberos 的 keytab 文件路径
注意:overwrite=true 与 data_save_mode=DROP_DATA 行为等价,二者择一配置即可,勿同时设置。
+### schema_save_mode [枚举]
+
在开始同步任务之前,针对目标端已存在的表结构选择不同的处理方案。
**默认值**: `CREATE_SCHEMA_WHEN_NOT_EXIST`
@@ -563,4 +570,4 @@ sink {
```
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 9a142fd66e..2c532c6d55 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@@ -42,6 +43,29 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
private final boolean abortDropPartitionMetadata;
private final org.apache.seatunnel.api.sink.DataSaveMode dataSaveMode;
+ /**
+ * Guard for overwrite semantics in Flink streaming engine.
+ *
+ * <p>In streaming mode, {@code commit()} is invoked on every completed
checkpoint. For
+ * overwrite (DROP_DATA), we must avoid deleting target directories on
every checkpoint;
+ * otherwise previously committed files will be wiped and only the last
checkpoint's files
+ * remain.
+ *
+ * <p>We delete each target directory (partition directory / table
directory) at most once per
+ * job attempt so that dynamic partitions can still be overwritten when
first written.
+ */
+ private final Set<String> deletedTargetDirectories =
ConcurrentHashMap.newKeySet();
+
+ /**
+ * Best-effort recovery detection based on the first seen checkpoint id
embedded in transaction
+ * directory name (e.g. .../T_xxx_0_2 means checkpoint 2).
+ *
+ * <p>If the first seen checkpoint id is greater than 1, it usually
indicates the job is
+ * recovering from a previous checkpoint. In that case, deleting the
target directories would
+ * destroy already committed data that is consistent with the restored
state.
+ */
+ private volatile Long minCheckpointIdSeen = null;
+
private final ReadonlyConfig readonlyConfig;
private final HiveMetaStoreCatalog hiveMetaStore;
@@ -69,8 +93,15 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws
IOException {
log.info("Aggregated commit infos: {}", aggregatedCommitInfos);
if (dataSaveMode ==
org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA) {
- log.info("DataSaveMode=DROP_DATA: delete existing target
directories before commit.");
- deleteDirectories(aggregatedCommitInfos);
+ updateMinCheckpointIdSeen(aggregatedCommitInfos);
+ if (minCheckpointIdSeen != null && minCheckpointIdSeen > 1) {
+ log.info(
+ "DataSaveMode=DROP_DATA: skip deleting target
directories before commit."
+ + " Recovery is detected,
minCheckpointIdSeen={}",
+ minCheckpointIdSeen);
+ } else {
+ deleteDirectories(aggregatedCommitInfos);
+ }
}
List<FileAggregatedCommitInfo> errorCommitInfos =
super.commit(aggregatedCommitInfos);
@@ -134,12 +165,14 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
*
* @param aggregatedCommitInfos
*/
- private void deleteDirectories(List<FileAggregatedCommitInfo>
aggregatedCommitInfos)
+ private boolean deleteDirectories(List<FileAggregatedCommitInfo>
aggregatedCommitInfos)
throws IOException {
if (aggregatedCommitInfos.isEmpty()) {
- return;
+ return false;
}
+ boolean anyDeleted = false;
+
for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
LinkedHashMap<String, LinkedHashMap<String, String>>
transactionMap =
aggregatedCommitInfo.getTransactionMap();
@@ -162,9 +195,19 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
if
(aggregatedCommitInfo.getPartitionDirAndValuesMap().isEmpty()) {
// For non-partitioned table, extract and delete table
directory
// Example:
hdfs://hadoop-master1:8020/warehouse/test_overwrite_1/
- String tableDir = targetPath.substring(0,
targetPath.lastIndexOf('/'));
- hadoopFileSystemProxy.deleteFile(tableDir);
- log.info("Deleted table directory: {}", tableDir);
+ int lastSeparator =
+ Math.max(targetPath.lastIndexOf('/'),
targetPath.lastIndexOf('\\'));
+ if (lastSeparator <= 0) {
+ log.warn(
+ "Skip deleting table directory because target
path has no separator: {}",
+ targetPath);
+ continue;
+ }
+ String tableDir = targetPath.substring(0, lastSeparator);
+ if (deleteTargetDirectoryOnce(tableDir)) {
+ log.info("Deleted table directory: {}", tableDir);
+ anyDeleted = true;
+ }
} else {
// For partitioned table, extract and delete partition
directories
// Example:
@@ -172,12 +215,25 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
Set<String> partitionDirs =
transactionMap.values().stream()
.flatMap(m -> m.values().stream())
- .map(path -> path.substring(0,
path.lastIndexOf('/')))
+ .map(
+ path -> {
+ int sep =
+ Math.max(
+
path.lastIndexOf('/'),
+
path.lastIndexOf('\\'));
+ if (sep <= 0) {
+ return null;
+ }
+ return path.substring(0, sep);
+ })
+ .filter(p -> p != null && !p.isEmpty())
.collect(Collectors.toSet());
for (String partitionDir : partitionDirs) {
- hadoopFileSystemProxy.deleteFile(partitionDir);
- log.info("Deleted partition directory: {}",
partitionDir);
+ if (deleteTargetDirectoryOnce(partitionDir)) {
+ log.info("Deleted partition directory: {}",
partitionDir);
+ anyDeleted = true;
+ }
}
}
} catch (IOException e) {
@@ -185,5 +241,98 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
throw e;
}
}
+
+ return anyDeleted;
+ }
+
+ private boolean deleteTargetDirectoryOnce(String directory) throws
IOException {
+ if (directory == null || directory.isEmpty()) {
+ return false;
+ }
+
+ String normalized = normalizeDirectoryPath(directory);
+ if (normalized.isEmpty()) {
+ return false;
+ }
+
+ if (!deletedTargetDirectories.add(normalized)) {
+ return false;
+ }
+
+ hadoopFileSystemProxy.deleteFile(directory);
+ return true;
+ }
+
+ private String normalizeDirectoryPath(String directory) {
+ String normalized = directory.replace('\\', '/');
+ while (normalized.endsWith("/")) {
+ normalized = normalized.substring(0, normalized.length() - 1);
+ }
+ return normalized;
+ }
+
+ private void updateMinCheckpointIdSeen(List<FileAggregatedCommitInfo>
aggregatedCommitInfos) {
+ if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) {
+ return;
+ }
+
+ long minInThisCommit = Long.MAX_VALUE;
+ boolean found = false;
+
+ for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
+ if (aggregatedCommitInfo == null ||
aggregatedCommitInfo.getTransactionMap() == null) {
+ continue;
+ }
+ for (String transactionDir :
aggregatedCommitInfo.getTransactionMap().keySet()) {
+ long checkpointId =
parseCheckpointIdFromTransactionDir(transactionDir);
+ if (checkpointId > 0) {
+ minInThisCommit = Math.min(minInThisCommit, checkpointId);
+ found = true;
+ }
+ }
+ }
+
+ if (!found) {
+ return;
+ }
+
+ if (minCheckpointIdSeen == null) {
+ minCheckpointIdSeen = minInThisCommit;
+ } else {
+ minCheckpointIdSeen = Math.min(minCheckpointIdSeen,
minInThisCommit);
+ }
+ }
+
+ /**
+ * Parses checkpoint id from transaction directory.
+ *
+ * <p>Expected pattern in transaction dir name:
.../T_..._<subtaskIndex>_<checkpointId>
+ */
+ private long parseCheckpointIdFromTransactionDir(String transactionDir) {
+ if (transactionDir == null || transactionDir.isEmpty()) {
+ return -1;
+ }
+
+ String normalized = transactionDir.replace('\\', '/');
+ while (normalized.endsWith("/")) {
+ normalized = normalized.substring(0, normalized.length() - 1);
+ }
+ int lastSlash = normalized.lastIndexOf('/');
+ String baseName = lastSlash >= 0 ? normalized.substring(lastSlash + 1)
: normalized;
+ if (baseName.isEmpty()) {
+ return -1;
+ }
+
+ int lastUnderscore = baseName.lastIndexOf('_');
+ if (lastUnderscore < 0 || lastUnderscore == baseName.length() - 1) {
+ return -1;
+ }
+
+ String lastToken = baseName.substring(lastUnderscore + 1);
+ try {
+ return Long.parseLong(lastToken);
+ } catch (NumberFormatException ignored) {
+ return -1;
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
new file mode 100644
index 0000000000..fb8a6bf5ee
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitterOverwriteStreamingTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.commit;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+class HiveSinkAggregatedCommitterOverwriteStreamingTest {
+
+ private static class TestableCommitter extends HiveSinkAggregatedCommitter
{
+ TestableCommitter(
+ ReadonlyConfig cfg, String dbName, String tableName,
HadoopConf hadoopConf) {
+ super(cfg, dbName, tableName, hadoopConf);
+ }
+
+ void setFileSystemProxy(HadoopFileSystemProxy proxy) {
+ this.hadoopFileSystemProxy = proxy;
+ }
+ }
+
+ @Test
+ void shouldDeletePartitionDirectoryOnlyOnceAcrossStreamingCheckpoints()
throws Exception {
+ // Given
+ ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+ TestableCommitter committer =
+ new TestableCommitter(readonlyConfig, "db", "tbl", new
HadoopConf("hdfs://dummy"));
+
+ HiveMetaStoreCatalog hiveMetaStore =
Mockito.mock(HiveMetaStoreCatalog.class);
+ Mockito.doNothing()
+ .when(hiveMetaStore)
+ .addPartitions(Mockito.anyString(), Mockito.anyString(),
Mockito.anyList());
+ setHiveMetaStore(committer, hiveMetaStore);
+
+ HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+ committer.setFileSystemProxy(fs);
+
+ String partitionDir = "/warehouse/db/tbl/pt=2025-12-16";
+
+ // checkpoint 1: empty transaction (matches production log pattern)
+ FileAggregatedCommitInfo cp1Empty =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_1", Collections.emptyMap(),
Collections.emptyMap());
+
+ // checkpoint 2: has one file -> should trigger overwrite deletion once
+ FileAggregatedCommitInfo cp2 =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_2",
+ Collections.singletonMap(
+
"/tmp/seatunnel/T_job_0_2/pt=2025-12-16/f1.parquet",
+ partitionDir + "/f1.parquet"),
+ Collections.singletonMap(
+ "pt=2025-12-16",
Collections.singletonList("2025-12-16")));
+
+ // checkpoint 3: has one more file -> MUST NOT delete partitionDir
again
+ FileAggregatedCommitInfo cp3 =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_3",
+ Collections.singletonMap(
+
"/tmp/seatunnel/T_job_0_3/pt=2025-12-16/f2.parquet",
+ partitionDir + "/f2.parquet"),
+ Collections.singletonMap(
+ "pt=2025-12-16",
Collections.singletonList("2025-12-16")));
+
+ // When
+ committer.commit(Collections.singletonList(cp1Empty));
+ committer.commit(Collections.singletonList(cp2));
+ committer.commit(Collections.singletonList(cp3));
+
+ // Then
+ // deleteFile is also used to delete transaction dirs in
super.commit(). We only assert
+ // deletion of the *target* partition directory happens once.
+ Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir);
+ }
+
+ @Test
+ void
shouldDeleteEachNewPartitionDirectoryOnlyOnceAcrossStreamingCheckpoints()
+ throws Exception {
+ // Given
+ ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+ TestableCommitter committer =
+ new TestableCommitter(readonlyConfig, "db", "tbl", new
HadoopConf("hdfs://dummy"));
+
+ HiveMetaStoreCatalog hiveMetaStore =
Mockito.mock(HiveMetaStoreCatalog.class);
+ Mockito.doNothing()
+ .when(hiveMetaStore)
+ .addPartitions(Mockito.anyString(), Mockito.anyString(),
Mockito.anyList());
+ setHiveMetaStore(committer, hiveMetaStore);
+
+ HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+ committer.setFileSystemProxy(fs);
+
+ String partitionDir1 = "/warehouse/db/tbl/pt=2025-12-16";
+ String partitionDir2 = "/warehouse/db/tbl/pt=2025-12-17";
+
+ // checkpoint 1: empty transaction
+ FileAggregatedCommitInfo cp1Empty =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_1", Collections.emptyMap(),
Collections.emptyMap());
+
+ // checkpoint 2: first partition
+ FileAggregatedCommitInfo cp2 =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_2",
+ Collections.singletonMap(
+
"/tmp/seatunnel/T_job_0_2/pt=2025-12-16/f1.parquet",
+ partitionDir1 + "/f1.parquet"),
+ Collections.singletonMap(
+ "pt=2025-12-16",
Collections.singletonList("2025-12-16")));
+
+ // checkpoint 3: new partition appears
+ FileAggregatedCommitInfo cp3 =
+ aggregatedCommitInfo(
+ "/tmp/seatunnel/T_job_0_3",
+ Collections.singletonMap(
+
"/tmp/seatunnel/T_job_0_3/pt=2025-12-17/f2.parquet",
+ partitionDir2 + "/f2.parquet"),
+ Collections.singletonMap(
+ "pt=2025-12-17",
Collections.singletonList("2025-12-17")));
+
+ // When
+ committer.commit(Collections.singletonList(cp1Empty));
+ committer.commit(Collections.singletonList(cp2));
+ committer.commit(Collections.singletonList(cp3));
+
+ // Then
+ Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir1);
+ Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir2);
+ }
+
+ @Test
+ void
e2eLikeCommitShouldAccumulateFilesAcrossCheckpointsWhenOverwriteEnabled(
+ @TempDir Path tempDir) throws Exception {
+ // Given
+ ReadonlyConfig readonlyConfig = minimalHiveReadonlyConfig(true);
+ TestableCommitter committer =
+ new TestableCommitter(readonlyConfig, "db", "tbl", new
HadoopConf("hdfs://dummy"));
+
+ HiveMetaStoreCatalog hiveMetaStore =
Mockito.mock(HiveMetaStoreCatalog.class);
+ Mockito.doNothing()
+ .when(hiveMetaStore)
+ .addPartitions(Mockito.anyString(), Mockito.anyString(),
Mockito.anyList());
+ setHiveMetaStore(committer, hiveMetaStore);
+
+ // Build a mock FS proxy that actually moves/deletes on local FS.
+ HadoopFileSystemProxy fs = Mockito.mock(HadoopFileSystemProxy.class);
+ Mockito.doAnswer(
+ invocation -> {
+ String oldPath = invocation.getArgument(0);
+ String newPath = invocation.getArgument(1);
+ boolean removeWhenExists =
invocation.getArgument(2);
+
+ Path oldP = Paths.get(oldPath);
+ Path newP = Paths.get(newPath);
+
+ if (!Files.exists(oldP)) {
+ return null;
+ }
+
+ if (removeWhenExists && Files.exists(newP)) {
+ Files.delete(newP);
+ }
+ if (newP.getParent() != null) {
+ Files.createDirectories(newP.getParent());
+ }
+ Files.move(oldP, newP,
StandardCopyOption.REPLACE_EXISTING);
+ return null;
+ })
+ .when(fs)
+ .renameFile(Mockito.anyString(), Mockito.anyString(),
Mockito.anyBoolean());
+
+ Mockito.doAnswer(
+ invocation -> {
+ String pathStr = invocation.getArgument(0);
+ Path p = Paths.get(pathStr);
+ if (!Files.exists(p)) {
+ return null;
+ }
+ // delete recursively
+ try (Stream<Path> walk = Files.walk(p)) {
+ walk.sorted((a, b) -> b.getNameCount() -
a.getNameCount())
+ .forEach(
+ x -> {
+ try {
+
Files.deleteIfExists(x);
+ } catch (Exception e) {
+ throw new
RuntimeException(e);
+ }
+ });
+ }
+ return null;
+ })
+ .when(fs)
+ .deleteFile(Mockito.anyString());
+
+ committer.setFileSystemProxy(fs);
+
+ Path targetPartitionDir =
tempDir.resolve("warehouse/db/tbl/pt=2025-12-16");
+ String partitionDir = targetPartitionDir.toString();
+
+ // checkpoint 1: empty transaction
+ FileAggregatedCommitInfo cp1Empty =
+ aggregatedCommitInfo(
+ tempDir.resolve("txn/T_job_0_1").toString(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ // checkpoint 2: create a temp file to be moved
+ Path txn2 = tempDir.resolve("txn/T_job_0_2");
+ Path tmpFile1 = txn2.resolve("pt=2025-12-16/f1.parquet");
+ Files.createDirectories(tmpFile1.getParent());
+ Files.write(tmpFile1, "file1".getBytes(StandardCharsets.UTF_8));
+
+ FileAggregatedCommitInfo cp2 =
+ aggregatedCommitInfo(
+ txn2.toString(),
+ Collections.singletonMap(
+ tmpFile1.toString(),
+
targetPartitionDir.resolve("f1.parquet").toString()),
+ Collections.singletonMap(
+ "pt=2025-12-16",
Collections.singletonList("2025-12-16")));
+
+ // checkpoint 3: another temp file
+ Path txn3 = tempDir.resolve("txn/T_job_0_3");
+ Path tmpFile2 = txn3.resolve("pt=2025-12-16/f2.parquet");
+ Files.createDirectories(tmpFile2.getParent());
+ Files.write(tmpFile2, "file2".getBytes(StandardCharsets.UTF_8));
+
+ FileAggregatedCommitInfo cp3 =
+ aggregatedCommitInfo(
+ txn3.toString(),
+ Collections.singletonMap(
+ tmpFile2.toString(),
+
targetPartitionDir.resolve("f2.parquet").toString()),
+ Collections.singletonMap(
+ "pt=2025-12-16",
Collections.singletonList("2025-12-16")));
+
+ // When
+ committer.commit(Collections.singletonList(cp1Empty));
+ committer.commit(Collections.singletonList(cp2));
+ committer.commit(Collections.singletonList(cp3));
+
+ // Then
+ Assertions.assertTrue(Files.isDirectory(targetPartitionDir));
+
Assertions.assertTrue(Files.exists(targetPartitionDir.resolve("f1.parquet")));
+
Assertions.assertTrue(Files.exists(targetPartitionDir.resolve("f2.parquet")));
+
+ long fileCount;
+ try (Stream<Path> stream = Files.list(targetPartitionDir)) {
+ fileCount = stream.count();
+ }
+ Assertions.assertEquals(2, fileCount);
+
+ // sanity: partition deletion should only happen once
+ Mockito.verify(fs, Mockito.times(1)).deleteFile(partitionDir);
+ }
+
+ private static FileAggregatedCommitInfo aggregatedCommitInfo(
+ String transactionDir,
+ Map<String, String> fileMoves,
+ Map<String, List<String>> partitions) {
+ LinkedHashMap<String, LinkedHashMap<String, String>> transactionMap =
new LinkedHashMap<>();
+ LinkedHashMap<String, String> moveMap = new LinkedHashMap<>();
+ moveMap.putAll(fileMoves);
+ transactionMap.put(transactionDir, moveMap);
+
+ LinkedHashMap<String, List<String>> partitionMap = new
LinkedHashMap<>();
+ partitionMap.putAll(partitions);
+
+ return new FileAggregatedCommitInfo(transactionMap, partitionMap);
+ }
+
+ private static ReadonlyConfig minimalHiveReadonlyConfig(boolean overwrite)
{
+ LinkedHashMap<String, Object> map = new LinkedHashMap<>();
+ // Required by HiveMetaStoreCatalog ctor
+ map.put(HiveOptions.METASTORE_URI.key(), "thrift://dummy:9083");
+ map.put(HiveConfig.HADOOP_CONF_PATH.key(), "/tmp");
+ map.put(HiveConfig.HIVE_SITE_PATH.key(), "/tmp/hive-site.xml");
+
+ // Used by HiveSinkAggregatedCommitter
+ map.put(HiveSinkOptions.OVERWRITE.key(), overwrite);
+ // other options are defaulted
+
+ return ReadonlyConfig.fromMap(map);
+ }
+
+ private static void setHiveMetaStore(
+ HiveSinkAggregatedCommitter committer, HiveMetaStoreCatalog
hiveMetaStore)
+ throws Exception {
+ Field f =
HiveSinkAggregatedCommitter.class.getDeclaredField("hiveMetaStore");
+ f.setAccessible(true);
+ f.set(committer, hiveMetaStore);
+ }
+}