This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a6e463329 [core] Fix that some tag creations haven't handle tag
callbacks (#2550)
a6e463329 is described below
commit a6e463329131cbece788e9dfdc9ff25d09ca7535
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 21 13:27:17 2023 +0800
[core] Fix that some tag creations haven't handle tag callbacks (#2550)
---
.../java/org/apache/paimon/AbstractFileStore.java | 31 ++++++++-
.../org/apache/paimon/AppendOnlyFileStore.java | 6 +-
.../src/main/java/org/apache/paimon/FileStore.java | 4 ++
.../java/org/apache/paimon/KeyValueFileStore.java | 6 +-
.../paimon/table/AbstractFileStoreTable.java | 76 ++--------------------
.../paimon/table/AppendOnlyFileStoreTable.java | 3 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 3 +-
.../apache/paimon/table/sink/CallbackUtils.java | 75 +++++++++++++++++++++
.../org/apache/paimon/tag/TagAutoCreation.java | 15 +++--
.../java/org/apache/paimon/utils/TagManager.java | 11 +++-
.../test/java/org/apache/paimon/TestFileStore.java | 5 +-
.../apache/paimon/operation/FileDeletionTest.java | 28 ++++----
.../operation/UncleanedFileStoreExpireTest.java | 2 +-
.../sink/AutoTagForSavepointCommitterOperator.java | 18 +++--
.../flink/sink/BatchWriteGeneratorTagOperator.java | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 3 +-
.../AutoTagForSavepointCommitterOperatorTest.java | 6 +-
17 files changed, 187 insertions(+), 107 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index cf6809b95..c5a976823 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
@@ -32,6 +34,9 @@ import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CallbackUtils;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -42,7 +47,9 @@ import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
/**
* Base {@link FileStore} implementation.
@@ -56,6 +63,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
protected final long schemaId;
protected final CoreOptions options;
protected final RowType partitionType;
+ private final CatalogEnvironment catalogEnvironment;
@Nullable private final SegmentsCache<String> writeManifestCache;
@@ -64,12 +72,14 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
SchemaManager schemaManager,
long schemaId,
CoreOptions options,
- RowType partitionType) {
+ RowType partitionType,
+ CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.options = options;
this.partitionType = partitionType;
+ this.catalogEnvironment = catalogEnvironment;
MemorySize writeManifestCache = options.writeManifestCache();
this.writeManifestCache =
writeManifestCache.getBytes() == 0
@@ -229,6 +239,23 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
@Nullable
public TagAutoCreation newTagCreationManager() {
return TagAutoCreation.create(
- options, snapshotManager(), newTagManager(), newTagDeletion());
+ options,
+ snapshotManager(),
+ newTagManager(),
+ newTagDeletion(),
+ createTagCallbacks());
+ }
+
+ @Override
+ public List<TagCallback> createTagCallbacks() {
+ List<TagCallback> callbacks = new
ArrayList<>(CallbackUtils.loadTagCallbacks(options));
+ String partitionField = options.tagToPartitionField();
+ MetastoreClient.Factory metastoreClientFactory =
+ catalogEnvironment.metastoreClientFactory();
+ if (partitionField != null && metastoreClientFactory != null) {
+ callbacks.add(
+ new
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
+ }
+ return callbacks;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a36d02bd8..ec1e7cb58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -29,6 +29,7 @@ import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import java.util.Comparator;
@@ -53,8 +54,9 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
RowType partitionType,
RowType bucketKeyType,
RowType rowType,
- String tableName) {
- super(fileIO, schemaManager, schemaId, options, partitionType);
+ String tableName,
+ CatalogEnvironment catalogEnvironment) {
+ super(fileIO, schemaManager, schemaId, options, partitionType,
catalogEnvironment);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index e67cf9f1e..f044102dc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -31,6 +31,7 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.List;
/**
* File store interface.
@@ -89,4 +91,6 @@ public interface FileStore<T> extends Serializable {
TagAutoCreation newTagCreationManager();
boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
+
+ List<TagCallback> createTagCallbacks();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 98b88323f..43aadcbfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -34,6 +34,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -79,8 +80,9 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory,
- String tableName) {
- super(fileIO, schemaManager, schemaId, options, partitionType);
+ String tableName,
+ CatalogEnvironment catalogEnvironment) {
+ super(fileIO, schemaManager, schemaId, options, partitionType,
catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 14bc66316..41f0b447f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -34,12 +34,12 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
@@ -50,15 +50,12 @@ import
org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.tag.TagPreview;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -284,7 +281,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
private List<CommitCallback> createCommitCallbacks() {
- List<CommitCallback> callbacks = new
ArrayList<>(loadCommitCallbacks());
+ List<CommitCallback> callbacks =
+ new
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
CoreOptions options = coreOptions();
MetastoreClient.Factory metastoreClientFactory =
catalogEnvironment.metastoreClientFactory();
@@ -308,62 +306,6 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
return callbacks;
}
- private List<TagCallback> createTagCallbacks() {
- List<TagCallback> callbacks = new ArrayList<>(loadTagCallbacks());
- String partitionField = coreOptions().tagToPartitionField();
- MetastoreClient.Factory metastoreClientFactory =
- catalogEnvironment.metastoreClientFactory();
- if (partitionField != null && metastoreClientFactory != null) {
- callbacks.add(
- new
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
- }
- return callbacks;
- }
-
- private List<TagCallback> loadTagCallbacks() {
- return loadCallbacks(coreOptions().tagCallbacks(), TagCallback.class);
- }
-
- private List<CommitCallback> loadCommitCallbacks() {
- return loadCallbacks(coreOptions().commitCallbacks(),
CommitCallback.class);
- }
-
- @SuppressWarnings("unchecked")
- private <T> List<T> loadCallbacks(Map<String, String> clazzParamMaps,
Class<T> expectClass) {
- List<T> result = new ArrayList<>();
-
- for (Map.Entry<String, String> classParamEntry :
clazzParamMaps.entrySet()) {
- String className = classParamEntry.getKey();
- String param = classParamEntry.getValue();
-
- Class<?> clazz;
- try {
- clazz = Class.forName(className, true,
this.getClass().getClassLoader());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- Preconditions.checkArgument(
- expectClass.isAssignableFrom(clazz),
- "Class " + clazz + " must implement " + expectClass);
-
- try {
- if (param == null) {
- result.add((T) clazz.newInstance());
- } else {
- result.add((T)
clazz.getConstructor(String.class).newInstance(param));
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to initialize commit callback "
- + className
- + (param == null ? "" : " with param " +
param),
- e);
- }
- }
- return result;
- }
-
private Optional<TableSchema> tryTimeTravel(Options options) {
CoreOptions coreOptions = new CoreOptions(options);
@@ -419,17 +361,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
fromSnapshotId);
Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
- tagManager().createTag(snapshot, tagName);
-
- List<TagCallback> callbacks = Collections.emptyList();
- try {
- callbacks = createTagCallbacks();
- callbacks.forEach(callback -> callback.notifyCreation(tagName));
- } finally {
- for (TagCallback tagCallback : callbacks) {
- IOUtils.closeQuietly(tagCallback);
- }
- }
+ tagManager().createTag(snapshot, tagName,
store().createTagCallbacks());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index dc865d93c..5f2c479e8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -81,7 +81,8 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.logicalPartitionType(),
tableSchema.logicalBucketKeyType(),
tableSchema.logicalRowType(),
- name());
+ name(),
+ catalogEnvironment);
}
return lazyStore;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 84a586066..fc4db704d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -109,7 +109,8 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
rowType,
extractor,
mfFactory,
- name());
+ name(),
+ catalogEnvironment);
}
return lazyStore;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
new file mode 100644
index 000000000..7d8a0a849
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to load callbacks. */
+public class CallbackUtils {
+
+ public static List<TagCallback> loadTagCallbacks(CoreOptions coreOptions) {
+ return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class);
+ }
+
+ public static List<CommitCallback> loadCommitCallbacks(CoreOptions
coreOptions) {
+ return loadCallbacks(coreOptions.commitCallbacks(),
CommitCallback.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> List<T> loadCallbacks(
+ Map<String, String> clazzParamMaps, Class<T> expectClass) {
+ List<T> result = new ArrayList<>();
+
+ for (Map.Entry<String, String> classParamEntry :
clazzParamMaps.entrySet()) {
+ String className = classParamEntry.getKey();
+ String param = classParamEntry.getValue();
+
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(className, true,
CallbackUtils.class.getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ Preconditions.checkArgument(
+ expectClass.isAssignableFrom(clazz),
+ "Class " + clazz + " must implement " + expectClass);
+
+ try {
+ if (param == null) {
+ result.add((T) clazz.newInstance());
+ } else {
+ result.add((T)
clazz.getConstructor(String.class).newInstance(param));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to initialize commit callback "
+ + className
+ + (param == null ? "" : " with param " +
param),
+ e);
+ }
+ }
+ return result;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index ba7069bd8..3b15d4891 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -21,6 +21,7 @@ package org.apache.paimon.tag;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -29,6 +30,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.time.LocalDateTime;
+import java.util.List;
import java.util.Optional;
import java.util.SortedMap;
@@ -45,6 +47,7 @@ public class TagAutoCreation {
private final TagPeriodHandler periodHandler;
private final Duration delay;
private final Integer numRetainedMax;
+ private final List<TagCallback> callbacks;
private LocalDateTime nextTag;
private long nextSnapshot;
@@ -56,7 +59,8 @@ public class TagAutoCreation {
TagTimeExtractor timeExtractor,
TagPeriodHandler periodHandler,
Duration delay,
- Integer numRetainedMax) {
+ Integer numRetainedMax,
+ List<TagCallback> callbacks) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.tagDeletion = tagDeletion;
@@ -64,6 +68,7 @@ public class TagAutoCreation {
this.periodHandler = periodHandler;
this.delay = delay;
this.numRetainedMax = numRetainedMax;
+ this.callbacks = callbacks;
this.periodHandler.validateDelay(delay);
@@ -127,7 +132,7 @@ public class TagAutoCreation {
|| isAfterOrEqual(time.minus(delay),
periodHandler.nextTagTime(nextTag))) {
LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
String tagName = periodHandler.timeToTag(thisTag);
- tagManager.createTag(snapshot, tagName);
+ tagManager.createTag(snapshot, tagName, callbacks);
nextTag = periodHandler.nextTagTime(thisTag);
if (numRetainedMax != null) {
@@ -156,7 +161,8 @@ public class TagAutoCreation {
CoreOptions options,
SnapshotManager snapshotManager,
TagManager tagManager,
- TagDeletion tagDeletion) {
+ TagDeletion tagDeletion,
+ List<TagCallback> callbacks) {
TagTimeExtractor extractor =
TagTimeExtractor.createForAutoTag(options);
if (extractor == null) {
return null;
@@ -168,6 +174,7 @@ public class TagAutoCreation {
extractor,
TagPeriodHandler.create(options),
options.tagCreationDelay(),
- options.tagNumRetainedMax());
+ options.tagNumRetainedMax(),
+ callbacks);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index bd526ee2b..a83e201d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class TagManager {
}
/** Create a tag from given snapshot and save it in the storage. */
- public void createTag(Snapshot snapshot, String tagName) {
+ public void createTag(Snapshot snapshot, String tagName, List<TagCallback>
callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.",
tagName);
checkArgument(
@@ -85,6 +86,14 @@ public class TagManager {
tagName, newTagPath),
e);
}
+
+ try {
+ callbacks.forEach(callback -> callback.notifyCreation(tagName));
+ } finally {
+ for (TagCallback tagCallback : callbacks) {
+ IOUtils.closeQuietly(tagCallback);
+ }
+ }
}
public void deleteTag(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index fbf058f7f..6bd62a54b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -39,11 +39,13 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -113,7 +115,8 @@ public class TestFileStore extends KeyValueFileStore {
valueType,
keyValueFieldsExtractor,
mfFactory,
- (new Path(root)).getName());
+ (new Path(root)).getName(),
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
this.root = root;
this.fileIO = FileIOFinder.find(new Path(root));
this.keySerializer = new InternalRowSerializer(keyType);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index d150ca457..97708bfc2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -76,11 +76,13 @@ public class FileDeletionTest {
private long commitIdentifier;
private String root;
+ private TagManager tagManager;
@BeforeEach
public void setup() throws Exception {
commitIdentifier = 0L;
root = tempDir.toString();
+ tagManager = null;
}
/**
@@ -221,7 +223,7 @@ public class FileDeletionTest {
@Test
public void testExpireWithExistingTags() throws Exception {
TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
- TagManager tagManager = new TagManager(fileIO, store.options().path());
+ tagManager = new TagManager(fileIO, store.options().path());
SnapshotManager snapshotManager = store.snapshotManager();
TestKeyValueGenerator gen =
new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -237,7 +239,7 @@ public class FileDeletionTest {
// step 2: commit -A (by clean bucket 0) and create tag1
cleanBucket(store, gen.getPartition(gen.next()), 0);
- tagManager.createTag(snapshotManager.snapshot(2), "tag1");
+ createTag(snapshotManager.snapshot(2), "tag1");
assertThat(tagManager.tagExists("tag1")).isTrue();
// step 3: commit C to bucket 2
@@ -248,7 +250,7 @@ public class FileDeletionTest {
// step 4: commit -B (by clean bucket 1) and create tag2
cleanBucket(store, partition, 1);
- tagManager.createTag(snapshotManager.snapshot(4), "tag2");
+ createTag(snapshotManager.snapshot(4), "tag2");
assertThat(tagManager.tagExists("tag2")).isTrue();
// step 5: commit D to bucket 3
@@ -299,7 +301,7 @@ public class FileDeletionTest {
@Test
public void testExpireWithUpgradeAndTags() throws Exception {
TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
- TagManager tagManager = new TagManager(fileIO, store.options().path());
+ tagManager = new TagManager(fileIO, store.options().path());
SnapshotManager snapshotManager = store.snapshotManager();
TestKeyValueGenerator gen =
new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -328,7 +330,7 @@ public class FileDeletionTest {
// snapshot 3: commit -A (by clean bucket 0)
cleanBucket(store, gen.getPartition(gen.next()), 0);
- tagManager.createTag(snapshotManager.snapshot(1), "tag1");
+ createTag(snapshotManager.snapshot(1), "tag1");
store.newExpire(1, 1, Long.MAX_VALUE).expire();
// check data file and manifests
@@ -353,7 +355,7 @@ public class FileDeletionTest {
@Test
public void testDeleteTagWithSnapshot() throws Exception {
TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
- TagManager tagManager = new TagManager(fileIO, store.options().path());
+ tagManager = new TagManager(fileIO, store.options().path());
SnapshotManager snapshotManager = store.snapshotManager();
TestKeyValueGenerator gen =
new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -385,7 +387,7 @@ public class FileDeletionTest {
Arrays.asList(snapshot1.baseManifestList(),
snapshot1.deltaManifestList());
// create tag1
- tagManager.createTag(snapshot1, "tag1");
+ createTag(snapshot1, "tag1");
// expire snapshot 1, 2
store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -426,7 +428,7 @@ public class FileDeletionTest {
@Test
public void testDeleteTagWithOtherTag() throws Exception {
TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
- TagManager tagManager = new TagManager(fileIO, store.options().path());
+ tagManager = new TagManager(fileIO, store.options().path());
SnapshotManager snapshotManager = store.snapshotManager();
TestKeyValueGenerator gen =
new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -459,9 +461,9 @@ public class FileDeletionTest {
Arrays.asList(snapshot2.baseManifestList(),
snapshot2.deltaManifestList());
// create tags
- tagManager.createTag(snapshotManager.snapshot(1), "tag1");
- tagManager.createTag(snapshotManager.snapshot(2), "tag2");
- tagManager.createTag(snapshotManager.snapshot(4), "tag3");
+ createTag(snapshotManager.snapshot(1), "tag1");
+ createTag(snapshotManager.snapshot(2), "tag2");
+ createTag(snapshotManager.snapshot(4), "tag3");
// expire snapshot 1, 2, 3, 4
store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -703,4 +705,8 @@ public class FileDeletionTest {
store.snapshotManager().latestSnapshot(),
null);
}
+
+ private void createTag(Snapshot snapshot, String tagName) {
+ tagManager.createTag(snapshot, tagName, Collections.emptyList());
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ecaf5ae8e..a0794168f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -97,7 +97,7 @@ public class UncleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
// create tags for each snapshot
for (int id = 1; id <= latestSnapshotId; id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
- tagManager.createTag(snapshot, "tag" + id);
+ tagManager.createTag(snapshot, "tag" + id,
Collections.emptyList());
}
// randomly expire snapshots
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index 02c5b9606..da3425e9b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -71,13 +72,17 @@ public class AutoTagForSavepointCommitterOperator<CommitT,
GlobalCommitT>
private final SerializableSupplier<TagDeletion> tagDeletionFactory;
+ private final SerializableSupplier<List<TagCallback>> callbacksSupplier;
+
private final NavigableSet<Long> identifiersForTags;
- protected SnapshotManager snapshotManager;
+ private transient SnapshotManager snapshotManager;
+
+ private transient TagManager tagManager;
- protected TagManager tagManager;
+ private transient TagDeletion tagDeletion;
- protected TagDeletion tagDeletion;
+ private transient List<TagCallback> callbacks;
private transient ListState<Long> identifiersForTagsState;
@@ -85,11 +90,13 @@ public class AutoTagForSavepointCommitterOperator<CommitT,
GlobalCommitT>
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory,
- SerializableSupplier<TagDeletion> tagDeletionFactory) {
+ SerializableSupplier<TagDeletion> tagDeletionFactory,
+ SerializableSupplier<List<TagCallback>> callbacksSupplier) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.tagDeletionFactory = tagDeletionFactory;
+ this.callbacksSupplier = callbacksSupplier;
this.identifiersForTags = new TreeSet<>();
}
@@ -102,6 +109,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT,
GlobalCommitT>
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();
tagDeletion = tagDeletionFactory.get();
+ callbacks = callbacksSupplier.get();
identifiersForTagsState =
commitOperator
@@ -159,7 +167,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT,
GlobalCommitT>
for (Snapshot snapshot : snapshotForTags) {
String tagName = SAVEPOINT_TAG_PREFIX +
snapshot.commitIdentifier();
if (!tagManager.tagExists(tagName)) {
- tagManager.createTag(snapshot, tagName);
+ tagManager.createTag(snapshot, tagName, callbacks);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index dff75ff29..36ae32d15 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -117,7 +117,7 @@ public class BatchWriteGeneratorTagOperator<CommitT,
GlobalCommitT>
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
}
// Create a new tag
- tagManager.createTag(snapshot, tagName);
+ tagManager.createTag(snapshot, tagName,
table.store().createTagCallbacks());
// Expire the tag
expireTag();
} catch (Exception e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 480d41ae8..7d3cecdbf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -202,7 +202,8 @@ public abstract class FlinkSink<T> implements Serializable {
(CommitterOperator<Committable,
ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager,
- () -> table.store().newTagDeletion());
+ () -> table.store().newTagDeletion(),
+ () -> table.store().createTagCallbacks());
}
if (conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.BATCH
&& table.coreOptions().tagCreationMode() ==
TagCreationMode.BATCH) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 60839805f..880e052c7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -206,7 +206,8 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
super.createCommitterOperator(table, commitUser,
committableStateManager),
table::snapshotManager,
table::tagManager,
- () -> table.store().newTagDeletion());
+ () -> table.store().newTagDeletion(),
+ () -> table.store().createTagCallbacks());
}
@Override
@@ -221,6 +222,7 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
table, commitUser, committableStateManager,
initializeFunction),
table::snapshotManager,
table::tagManager,
- () -> table.store().newTagDeletion());
+ () -> table.store().newTagDeletion(),
+ () -> table.store().createTagCallbacks());
}
}