This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e551e33ea [core] Move commit callbacks from TableCommit into
FileStoreCommit (#3834)
e551e33ea is described below
commit e551e33ea5c6a2d14c54c4469519a437ae73c0e7
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 29 19:16:51 2024 +0800
[core] Move commit callbacks from TableCommit into FileStoreCommit (#3834)
---
.../java/org/apache/paimon/AbstractFileStore.java | 10 +++++-
.../src/main/java/org/apache/paimon/FileStore.java | 3 ++
.../org/apache/paimon/catalog/AbstractCatalog.java | 12 +++----
.../paimon/iceberg/IcebergCommitCallback.java | 29 +++++++++++-----
.../metastore/AddPartitionCommitCallback.java | 19 +++++++++--
.../paimon/metastore/TagPreviewCommitCallback.java | 20 ++++++++---
.../apache/paimon/operation/FileStoreCommit.java | 10 +++---
.../paimon/operation/FileStoreCommitImpl.java | 39 +++++++++++++++++-----
.../paimon/privilege/PrivilegedFileStore.java | 7 ++++
.../paimon/table/AbstractFileStoreTable.java | 5 ++-
.../apache/paimon/table/sink/CommitCallback.java | 7 +++-
.../apache/paimon/table/sink/TableCommitImpl.java | 29 +++-------------
.../test/java/org/apache/paimon/TestFileStore.java | 14 +++++---
.../apache/paimon/operation/FileDeletionTest.java | 1 +
.../paimon/operation/FileStoreCommitTest.java | 18 +++++++---
.../paimon/operation/FileStoreTestUtils.java | 5 +--
.../apache/paimon/operation/TestCommitThread.java | 3 +-
.../apache/paimon/table/sink/TableCommitTest.java | 12 +++++--
.../flink/procedure/DropPartitionProcedure.java | 11 +++---
.../SupportsRowLevelOperationFlinkTableSink.java | 18 +++++-----
.../paimon/spark/PaimonPartitionManagement.scala | 1 +
21 files changed, 178 insertions(+), 95 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index e33463d57..49d2c47f0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -43,6 +43,7 @@ import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -55,6 +56,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -184,6 +186,11 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public FileStoreCommitImpl newCommit(String commitUser) {
+ return newCommit(commitUser, Collections.emptyList());
+ }
+
+ @Override
+ public FileStoreCommitImpl newCommit(String commitUser,
List<CommitCallback> callbacks) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
@@ -205,7 +212,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.branch(),
newStatsFileHandler(),
bucketMode(),
- options.scanManifestParallelism());
+ options.scanManifestParallelism(),
+ callbacks);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index dae508733..38b7321fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -33,6 +33,7 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -79,6 +80,8 @@ public interface FileStore<T> {
FileStoreCommit newCommit(String commitUser);
+ FileStoreCommit newCommit(String commitUser, List<CommitCallback>
callbacks);
+
SnapshotDeletion newSnapshotDeletion();
ChangelogDeletion newChangelogDeletion();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index e5b0255b9..8b0245735 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -172,13 +172,14 @@ public abstract class AbstractCatalog implements Catalog {
throws TableNotExistException {
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileStoreCommit commit =
+ try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
- commit.dropPartitions(
- Collections.singletonList(partitionSpec),
BatchWriteBuilder.COMMIT_IDENTIFIER);
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+ commit.dropPartitions(
+ Collections.singletonList(partitionSpec),
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ }
}
protected abstract void createDatabaseImpl(String name, Map<String,
String> properties);
@@ -354,8 +355,7 @@ public abstract class AbstractCatalog implements Catalog {
}
return table;
} else {
- Table table = getDataTable(identifier);
- return table;
+ return getDataTable(identifier);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 95dae5569..158f05a32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -33,6 +33,7 @@ import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
@@ -45,6 +46,8 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -111,18 +114,26 @@ public class IcebergCommitCallback implements
CommitCallback {
}
@Override
- public void call(List<ManifestCommittable> committables) {
- for (ManifestCommittable committable : committables) {
- try {
- commitMetadata(committable);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ public void call(
+ List<ManifestEntry> committedEntries, long identifier, @Nullable
Long watermark) {
+ try {
+ commitMetadata(identifier);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
+ try {
+ commitMetadata(committable.identifier());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
- private void commitMetadata(ManifestCommittable committable) throws
IOException {
- Pair<Long, Long> pair =
getCurrentAndBaseSnapshotIds(committable.identifier());
+ private void commitMetadata(long identifier) throws IOException {
+ Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(identifier);
long currentSnapshot = pair.getLeft();
Long baseSnapshot = pair.getRight();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 5ca2a03f8..27468bc2f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -19,13 +19,17 @@
package org.apache.paimon.metastore;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.List;
@@ -48,9 +52,18 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
@Override
- public void call(List<ManifestCommittable> committables) {
- committables.stream()
- .flatMap(c -> c.fileCommittables().stream())
+ public void call(
+ List<ManifestEntry> committedEntries, long identifier, @Nullable
Long watermark) {
+ committedEntries.stream()
+ .filter(e -> FileKind.ADD.equals(e.kind()))
+ .map(ManifestEntry::partition)
+ .distinct()
+ .forEach(this::addPartition);
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
+ committable.fileCommittables().stream()
.map(CommitMessage::partition)
.distinct()
.forEach(this::addPartition);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
index e0aa8597b..3c477f917 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -19,9 +19,12 @@
package org.apache.paimon.metastore;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.tag.TagPreview;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Optional;
@@ -37,12 +40,19 @@ public class TagPreviewCommitCallback implements
CommitCallback {
}
@Override
- public void call(List<ManifestCommittable> committables) {
+ public void call(
+ List<ManifestEntry> committedEntries, long identifier, @Nullable
Long watermark) {
+ long currentMillis = System.currentTimeMillis();
+ Optional<String> tagOptional = tagPreview.extractTag(currentMillis,
watermark);
+ tagOptional.ifPresent(tagCallback::notifyCreation);
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
long currentMillis = System.currentTimeMillis();
- for (ManifestCommittable c : committables) {
- Optional<String> tagOptional =
tagPreview.extractTag(currentMillis, c.watermark());
- tagOptional.ifPresent(tagCallback::notifyCreation);
- }
+ Optional<String> tagOptional =
+ tagPreview.extractTag(currentMillis, committable.watermark());
+ tagOptional.ifPresent(tagCallback::notifyCreation);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index f43308c30..a63e2b733 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -28,18 +28,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/** Commit operation which provides commit and overwrite. */
-public interface FileStoreCommit {
+public interface FileStoreCommit extends AutoCloseable {
/** With global lock. */
FileStoreCommit withLock(Lock lock);
FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
- /** Find out which commit identifier need to be retried when recovering
from the failure. */
- Set<Long> filterCommitted(Set<Long> commitIdentifiers);
+ /** Find out which committables need to be retried when recovering from
the failure. */
+ List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables);
/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String>
properties);
@@ -88,4 +87,7 @@ public interface FileStoreCommit {
FileStorePathFactory pathFactory();
FileIO fileIO();
+
+ @Override
+ void close();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4d29bc188..39a66ae8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -45,10 +45,12 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
@@ -121,6 +123,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
+ private final List<CommitCallback> commitCallbacks;
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
@@ -152,7 +155,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
String branchName,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
- @Nullable Integer manifestReadParallelism) {
+ @Nullable Integer manifestReadParallelism,
+ List<CommitCallback> commitCallbacks) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
@@ -172,6 +176,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.keyComparator = keyComparator;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;
+ this.commitCallbacks = commitCallbacks;
this.lock = null;
this.ignoreEmptyCommit = true;
@@ -193,25 +198,33 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
@Override
- public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
+ public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables) {
// nothing to filter, fast exit
- if (commitIdentifiers.isEmpty()) {
- return commitIdentifiers;
+ if (committables.isEmpty()) {
+ return committables;
+ }
+
+ for (int i = 1; i < committables.size(); i++) {
+ Preconditions.checkArgument(
+ committables.get(i).identifier() > committables.get(i -
1).identifier(),
+ "Committables must be sorted according to identifiers
before filtering. This is unexpected.");
}
Optional<Snapshot> latestSnapshot =
snapshotManager.latestSnapshotOfUser(commitUser);
if (latestSnapshot.isPresent()) {
- Set<Long> result = new HashSet<>();
- for (Long identifier : commitIdentifiers) {
+ List<ManifestCommittable> result = new ArrayList<>();
+ for (ManifestCommittable committable : committables) {
// if committable is newer than latest snapshot, then it
hasn't been committed
- if (identifier > latestSnapshot.get().commitIdentifier()) {
- result.add(identifier);
+ if (committable.identifier() >
latestSnapshot.get().commitIdentifier()) {
+ result.add(committable);
+ } else {
+ commitCallbacks.forEach(callback ->
callback.retry(committable));
}
}
return result;
} else {
// if there is no previous snapshots then nothing should be
filtered
- return commitIdentifiers;
+ return committables;
}
}
@@ -986,6 +999,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
identifier,
commitKind.name()));
}
+ commitCallbacks.forEach(callback -> callback.call(tableFiles,
identifier, watermark));
return true;
}
@@ -1229,6 +1243,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ @Override
+ public void close() {
+ for (CommitCallback callback : commitCallbacks) {
+ IOUtils.closeQuietly(callback);
+ }
+ }
+
private static class LevelIdentifier {
private final BinaryRow partition;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ca2ad04a2..1870813e9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -139,6 +140,12 @@ public class PrivilegedFileStore<T> implements
FileStore<T> {
return wrapped.newCommit(commitUser);
}
+ @Override
+ public FileStoreCommit newCommit(String commitUser, List<CommitCallback>
callbacks) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newCommit(commitUser, callbacks);
+ }
+
@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 818b6e87e..d30fd7308 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -367,8 +367,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
return new TableCommitImpl(
- store().newCommit(commitUser),
- createCommitCallbacks(commitUser),
+ store().newCommit(commitUser,
createCommitCallbacks(commitUser)),
snapshotExpire,
options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
@@ -389,7 +388,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
if (options.partitionedTableInMetastore()
&& metastoreClientFactory != null
- && tableSchema.partitionKeys().size() > 0) {
+ && !tableSchema.partitionKeys().isEmpty()) {
callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index 2d6fbc8e8..23f39229d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -19,6 +19,9 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+
+import javax.annotation.Nullable;
import java.util.List;
@@ -35,5 +38,7 @@ import java.util.List;
*/
public interface CommitCallback extends AutoCloseable {
- void call(List<ManifestCommittable> committables);
+ void call(List<ManifestEntry> committedEntries, long identifier, @Nullable
Long watermark);
+
+ void retry(ManifestCommittable committable);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 66e715fd6..c769aa4a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -55,7 +55,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -73,7 +72,6 @@ public class TableCommitImpl implements InnerTableCommit {
private static final Logger LOG =
LoggerFactory.getLogger(TableCommitImpl.class);
private final FileStoreCommit commit;
- private final List<CommitCallback> commitCallbacks;
@Nullable private final Runnable expireSnapshots;
@Nullable private final PartitionExpire partitionExpire;
@Nullable private final TagAutoManager tagAutoManager;
@@ -93,7 +91,6 @@ public class TableCommitImpl implements InnerTableCommit {
public TableCommitImpl(
FileStoreCommit commit,
- List<CommitCallback> commitCallbacks,
@Nullable Runnable expireSnapshots,
@Nullable PartitionExpire partitionExpire,
@Nullable TagAutoManager tagAutoManager,
@@ -109,7 +106,6 @@ public class TableCommitImpl implements InnerTableCommit {
}
this.commit = commit;
- this.commitCallbacks = commitCallbacks;
this.expireSnapshots = expireSnapshots;
this.partitionExpire = partitionExpire;
this.tagAutoManager = tagAutoManager;
@@ -228,8 +224,6 @@ public class TableCommitImpl implements InnerTableCommit {
commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
expire(committable.identifier(), expireMainExecutor);
}
-
- commitCallbacks.forEach(c -> c.call(committables));
}
public int filterAndCommitMultiple(List<ManifestCommittable> committables)
{
@@ -238,26 +232,13 @@ public class TableCommitImpl implements InnerTableCommit {
public int filterAndCommitMultiple(
List<ManifestCommittable> committables, boolean checkAppendFiles) {
- Set<Long> retryIdentifiers =
- commit.filterCommitted(
- committables.stream()
- .map(ManifestCommittable::identifier)
- .collect(Collectors.toSet()));
-
- // commitCallback may fail after the snapshot file is successfully
created,
- // so we have to try all of them again
- List<ManifestCommittable> succeededCommittables =
- committables.stream()
- .filter(c ->
!retryIdentifiers.contains(c.identifier()))
- .collect(Collectors.toList());
- commitCallbacks.forEach(c -> c.call(succeededCommittables));
-
- List<ManifestCommittable> retryCommittables =
+ List<ManifestCommittable> sortedCommittables =
committables.stream()
- .filter(c -> retryIdentifiers.contains(c.identifier()))
// identifier must be in increasing order
.sorted(Comparator.comparingLong(ManifestCommittable::identifier))
.collect(Collectors.toList());
+ List<ManifestCommittable> retryCommittables =
commit.filterCommitted(sortedCommittables);
+
if (!retryCommittables.isEmpty()) {
checkFilesExistence(retryCommittables);
commitMultiple(retryCommittables, checkAppendFiles);
@@ -378,9 +359,7 @@ public class TableCommitImpl implements InnerTableCommit {
@Override
public void close() throws Exception {
- for (CommitCallback commitCallback : commitCallbacks) {
- IOUtils.closeQuietly(commitCallback);
- }
+ commit.close();
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index db822d7be..bd6950d77 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -246,14 +246,15 @@ public class TestFileStore extends KeyValueFileStore {
}
public Snapshot dropPartitions(List<Map<String, String>> partitions) {
- FileStoreCommit commit = newCommit(commitUser);
-
SnapshotManager snapshotManager = snapshotManager();
Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
- commit.dropPartitions(partitions, Long.MAX_VALUE);
+
+ try (FileStoreCommit commit = newCommit(commitUser)) {
+ commit.dropPartitions(partitions, Long.MAX_VALUE);
+ }
Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
assertThat(snapshotIdAfterCommit).isNotNull();
@@ -316,7 +317,6 @@ public class TestFileStore extends KeyValueFileStore {
.write(kv);
}
- FileStoreCommit commit = newCommit(commitUser);
ManifestCommittable committable =
new ManifestCommittable(
identifier == null ? commitIdentifier++ : identifier,
watermark);
@@ -341,7 +341,11 @@ public class TestFileStore extends KeyValueFileStore {
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
- commitFunction.accept(commit, committable);
+
+ try (FileStoreCommit commit = newCommit(commitUser)) {
+ commitFunction.accept(commit, committable);
+ }
+
Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
if (snapshotIdAfterCommit == null) {
snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index b6c80ad74..17693b073 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -155,6 +155,7 @@ public class FileDeletionTest {
partitionSpec.put("hr", "8");
commit.overwrite(
partitionSpec, new ManifestCommittable(commitIdentifier++),
Collections.emptyMap());
+ commit.close();
// step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
BinaryRow partition = partitions.get(7);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index a60554db2..aeccafb85 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -71,7 +71,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
@@ -182,7 +181,9 @@ public class FileStoreCommitTest {
Path firstSnapshotPath =
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
LocalFileIO.create().deleteQuietly(firstSnapshotPath);
// this test succeeds if this call does not fail
-
store.newCommit(UUID.randomUUID().toString()).filterCommitted(Collections.singleton(999L));
+ try (FileStoreCommit commit =
store.newCommit(UUID.randomUUID().toString())) {
+ commit.filterCommitted(Collections.singletonList(new
ManifestCommittable(999L)));
+ }
}
@Test
@@ -201,8 +202,15 @@ public class FileStoreCommitTest {
}
// all commit identifiers should be filtered out
- Set<Long> remaining =
store.newCommit(user).filterCommitted(commitIdentifiers);
- assertThat(remaining).isEmpty();
+ try (FileStoreCommit commit = store.newCommit(user)) {
+ assertThat(
+ commit.filterCommitted(
+ commitIdentifiers.stream()
+ .sorted()
+ .map(ManifestCommittable::new)
+ .collect(Collectors.toList())))
+ .isEmpty();
+ }
}
protected void testRandomConcurrentNoConflict(
@@ -850,6 +858,8 @@ public class FileStoreCommitTest {
readStats = statsFileHandler.readStats();
assertThat(readStats).isPresent();
assertThat(readStats.get()).isEqualTo(fakeStats);
+
+ fileStoreCommit.close();
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index e4743f770..dc2af066e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -90,7 +90,6 @@ public class FileStoreTestUtils {
long commitIdentifier,
Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers)
throws Exception {
- FileStoreCommit commit = store.newCommit();
ManifestCommittable committable = new
ManifestCommittable(commitIdentifier, null);
for (Map.Entry<BinaryRow, Map<Integer, RecordWriter<KeyValue>>>
entryWithPartition :
writers.entrySet()) {
@@ -106,7 +105,9 @@ public class FileStoreTestUtils {
}
}
- commit.commit(committable, Collections.emptyMap());
+ try (FileStoreCommit commit = store.newCommit()) {
+ commit.commit(committable, Collections.emptyMap());
+ }
writers.values().stream()
.flatMap(m -> m.values().stream())
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 748ec53cd..872cb554f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -193,8 +193,7 @@ public class TestCommitThread extends Thread {
while (true) {
try {
if (shouldCheckFilter) {
- if
(commit.filterCommitted(Collections.singleton(committable.identifier()))
- .isEmpty()) {
+ if
(commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) {
break;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 34abc25b1..18f858c23 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -41,6 +42,8 @@ import org.apache.paimon.utils.FailingFileIO;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -177,8 +180,13 @@ public class TableCommitTest {
}
@Override
- public void call(List<ManifestCommittable> committables) {
- committables.forEach(c ->
commitCallbackResult.get(testId).add(c.identifier()));
+ public void call(List<ManifestEntry> entries, long identifier,
@Nullable Long watermark) {
+ commitCallbackResult.get(testId).add(identifier);
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
+ commitCallbackResult.get(testId).add(committable.identifier());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 76c1ea096..3a231758f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -52,14 +52,15 @@ public class DropPartitionProcedure extends ProcedureBase {
FileStoreTable fileStoreTable =
(FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
- FileStoreCommit commit =
+ try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
- commit.dropPartitions(
- ParameterUtils.getPartitions(partitionStrings),
- BatchWriteBuilder.COMMIT_IDENTIFIER);
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+ commit.dropPartitions(
+ ParameterUtils.getPartitions(partitionStrings),
+ BatchWriteBuilder.COMMIT_IDENTIFIER);
+ }
return new String[] {"Success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index e809fc22d..583c1c9d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -161,18 +161,18 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
@Override
public Optional<Long> executeDeletion() {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileStoreCommit commit =
+ try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
- long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
- if (deletePredicate == null) {
- commit.truncateTable(identifier);
- return Optional.empty();
- } else {
- checkArgument(deleteIsDropPartition());
-
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+ long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+ if (deletePredicate == null) {
+ commit.truncateTable(identifier);
+ } else {
+ checkArgument(deleteIsDropPartition());
+
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
+ }
return Optional.empty();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 85c85435e..1aabcddf1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -59,6 +59,7 @@ trait PaimonPartitionManagement extends
SupportsPartitionManagement {
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.close()
true
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports
drop partitions.")