This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 313f8b236 [core] Force creating snapshot for tag with auto process 
time creation (#2188)
313f8b236 is described below

commit 313f8b2368f6e8d7948a1cf28117166a6ddb7b64
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Oct 30 20:04:17 2023 +0800

    [core] Force creating snapshot for tag with auto process time creation 
(#2188)
---
 .../manifest/WrappedManifestCommittable.java       | 90 +++++++++++-----------
 .../apache/paimon/table/sink/TableCommitImpl.java  |  4 +
 .../org/apache/paimon/tag/TagAutoCreation.java     |  4 +
 .../org/apache/paimon/tag/TagTimeExtractor.java    | 12 +++
 .../manifest/WrappedManifestCommittableTest.java   |  4 +-
 .../org/apache/paimon/flink/sink/Committer.java    |  2 +
 .../paimon/flink/sink/CommitterOperator.java       | 11 ++-
 .../apache/paimon/flink/sink/StoreCommitter.java   |  5 ++
 .../paimon/flink/sink/StoreMultiCommitter.java     | 36 +++++++--
 .../sink/WrappedManifestCommittableSerializer.java | 11 ++-
 .../paimon/flink/sink/CommitterOperatorTest.java   | 42 ++++++++++
 .../flink/sink/CommitterOperatorTestBase.java      |  6 ++
 .../paimon/flink/sink/StoreMultiCommitterTest.java | 90 +++++++++++++++++++---
 .../WrappedManifestCommittableSerializerTest.java  |  5 +-
 14 files changed, 248 insertions(+), 74 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
index 5b72f8d8c..c5e8527fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
@@ -28,15 +28,44 @@ import java.util.TreeMap;
 /** Manifest commit message. */
 public class WrappedManifestCommittable {
 
-    private Map<Identifier, ManifestCommittable> manifestCommittables;
+    private final long checkpointId;
 
-    public WrappedManifestCommittable() {
+    private final long watermark;
+
+    private final Map<Identifier, ManifestCommittable> manifestCommittables;
+
+    public WrappedManifestCommittable(long checkpointId, long watermark) {
+        this.checkpointId = checkpointId;
+        this.watermark = watermark;
         this.manifestCommittables =
                 new TreeMap<>(
                         Comparator.comparing(Identifier::getDatabaseName)
                                 .thenComparing(Identifier::getObjectName));
     }
 
+    public long checkpointId() {
+        return checkpointId;
+    }
+
+    public long watermark() {
+        return watermark;
+    }
+
+    public Map<Identifier, ManifestCommittable> manifestCommittables() {
+        return manifestCommittables;
+    }
+
+    public ManifestCommittable computeCommittableIfAbsent(
+            Identifier identifier, long checkpointId, long watermark) {
+        return manifestCommittables.computeIfAbsent(
+                identifier, id -> new ManifestCommittable(checkpointId, 
watermark));
+    }
+
+    public void putManifestCommittable(
+            Identifier identifier, ManifestCommittable manifestCommittable) {
+        manifestCommittables.put(identifier, manifestCommittable);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -46,58 +75,25 @@ public class WrappedManifestCommittable {
             return false;
         }
         WrappedManifestCommittable that = (WrappedManifestCommittable) o;
-
-        if (manifestCommittables.size() != that.manifestCommittables.size()) {
-            return false;
-        }
-
-        for (Map.Entry<Identifier, ManifestCommittable> entry : 
manifestCommittables.entrySet()) {
-            if (!Objects.equals(entry.getValue(), 
that.manifestCommittables.get(entry.getKey()))) {
-                return false;
-            }
-        }
-
-        return true;
+        return checkpointId == that.checkpointId
+                && watermark == that.watermark
+                && Objects.equals(manifestCommittables, 
that.manifestCommittables);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(manifestCommittables.values().toArray(new 
Object[0]));
+        return Objects.hash(checkpointId, watermark, manifestCommittables);
     }
 
     @Override
     public String toString() {
-        return String.format(
-                "WrappedManifestCommittable {" + "manifestCommittables = %s",
-                formatManifestCommittables());
-    }
-
-    private String formatManifestCommittables() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("{");
-        for (Identifier id : manifestCommittables.keySet()) {
-            ManifestCommittable committable = manifestCommittables.get(id);
-            sb.append(String.format("%s=%s, ", id.getFullName(), 
committable.toString()));
-        }
-        if (manifestCommittables.size() > 0) {
-            sb.delete(sb.length() - 2, sb.length());
-        }
-        sb.append("}");
-        return sb.toString();
-    }
-
-    public ManifestCommittable computeCommittableIfAbsent(
-            Identifier identifier, long checkpointId, long watermark) {
-        return manifestCommittables.computeIfAbsent(
-                identifier, id -> new ManifestCommittable(checkpointId, 
watermark));
-    }
-
-    public ManifestCommittable putManifestCommittable(
-            Identifier identifier, ManifestCommittable manifestCommittable) {
-        return manifestCommittables.put(identifier, manifestCommittable);
-    }
-
-    public Map<Identifier, ManifestCommittable> getManifestCommittables() {
-        return manifestCommittables;
+        return "WrappedManifestCommittable{"
+                + "checkpointId="
+                + checkpointId
+                + ", watermark="
+                + watermark
+                + ", manifestCommittables="
+                + manifestCommittables
+                + '}';
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 3d6dc98d7..8fe553e96 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -119,6 +119,10 @@ public class TableCommitImpl implements InnerTableCommit {
         this.tableName = tableName;
     }
 
+    public boolean forceCreatingSnapshot() {
+        return tagAutoCreation != null && 
tagAutoCreation.forceCreatingSnapshot();
+    }
+
     @Override
     public TableCommitImpl withOverwrite(@Nullable Map<String, String> 
overwritePartitions) {
         this.overwritePartition = overwritePartitions;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 6ec7f1cf2..e4a8e47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -80,6 +80,10 @@ public class TagAutoCreation {
         }
     }
 
+    public boolean forceCreatingSnapshot() {
+        return timeExtractor.forceCreatingSnapshot();
+    }
+
     public void run() {
         while (true) {
             if (snapshotManager.snapshotExists(nextSnapshot)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index dadfde3a7..c7315f6b4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -33,6 +33,8 @@ public interface TagTimeExtractor {
 
     Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark);
 
+    boolean forceCreatingSnapshot();
+
     /** Extract time from snapshot time millis. */
     class ProcessTimeExtractor implements TagTimeExtractor {
 
@@ -43,6 +45,11 @@ public interface TagTimeExtractor {
                             .atZone(ZoneId.systemDefault())
                             .toLocalDateTime());
         }
+
+        @Override
+        public boolean forceCreatingSnapshot() {
+            return true;
+        }
     }
 
     /** Extract time from snapshot watermark. */
@@ -63,6 +70,11 @@ public interface TagTimeExtractor {
             return Optional.of(
                     
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
         }
+
+        @Override
+        public boolean forceCreatingSnapshot() {
+            return false;
+        }
     }
 
     @Nullable
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
index 67ec4c9c7..7eef1b427 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
@@ -32,12 +32,12 @@ class WrappedManifestCommittableTest {
         ManifestCommittable committable1 = create();
         ManifestCommittable committable2 = create();
 
-        WrappedManifestCommittable wrapped1 = new WrappedManifestCommittable();
+        WrappedManifestCommittable wrapped1 = new 
WrappedManifestCommittable(-1, -1);
         wrapped1.putManifestCommittable(Identifier.create("db", "table1"), 
committable1);
         wrapped1.putManifestCommittable(Identifier.create("db", "table2"), 
committable2);
 
         // add manifest committables in reverse order
-        WrappedManifestCommittable wrapped2 = new WrappedManifestCommittable();
+        WrappedManifestCommittable wrapped2 = new 
WrappedManifestCommittable(-1, -1);
         wrapped2.putManifestCommittable(Identifier.create("db", "table2"), 
committable2);
         wrapped2.putManifestCommittable(Identifier.create("db", "table1"), 
committable1);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index c80fc39e0..a58d97e4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -35,6 +35,8 @@ import java.util.Map;
  */
 public interface Committer<CommitT, GlobalCommitT> extends AutoCloseable {
 
+    boolean forceCreatingSnapshot();
+
     /** Compute an aggregated committable from a list of committables. */
     GlobalCommitT combine(long checkpointId, long watermark, List<CommitT> 
committables)
             throws IOException;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index b27bb3bb9..16563cb77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
@@ -154,8 +155,16 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
         NavigableMap<Long, GlobalCommitT> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        committer.commit(committables(headMap));
+        List<GlobalCommitT> committables = committables(headMap);
+        committer.commit(committables);
         headMap.clear();
+
+        if (committables.isEmpty()) {
+            if (committer.forceCreatingSnapshot()) {
+                GlobalCommitT commit = toCommittables(checkpointId, 
Collections.emptyList());
+                committer.commit(Collections.singletonList(commit));
+            }
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 1025fa930..7857242bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -60,6 +60,11 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
         return committerMetrics;
     }
 
+    @Override
+    public boolean forceCreatingSnapshot() {
+        return commit.forceCreatingSnapshot();
+    }
+
     @Override
     public ManifestCommittable combine(
             long checkpointId, long watermark, List<Committable> committables) 
{
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index ad420de68..089311208 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -79,10 +79,16 @@ public class StoreMultiCommitter
         this.isCompactJob = isCompactJob;
     }
 
+    @Override
+    public boolean forceCreatingSnapshot() {
+        return true;
+    }
+
     @Override
     public WrappedManifestCommittable combine(
             long checkpointId, long watermark, List<MultiTableCommittable> 
committables) {
-        WrappedManifestCommittable wrappedManifestCommittable = new 
WrappedManifestCommittable();
+        WrappedManifestCommittable wrappedManifestCommittable =
+                new WrappedManifestCommittable(checkpointId, watermark);
         for (MultiTableCommittable committable : committables) {
 
             ManifestCommittable manifestCommittable =
@@ -109,15 +115,29 @@ public class StoreMultiCommitter
     @Override
     public void commit(List<WrappedManifestCommittable> committables)
             throws IOException, InterruptedException {
+        if (committables.isEmpty()) {
+            return;
+        }
 
         // key by table id
         Map<Identifier, List<ManifestCommittable>> committableMap = 
groupByTable(committables);
-
-        for (Map.Entry<Identifier, List<ManifestCommittable>> entry : 
committableMap.entrySet()) {
-            Identifier tableId = entry.getKey();
-            List<ManifestCommittable> committableList = entry.getValue();
-            StoreCommitter committer = getStoreCommitter(tableId);
-            committer.commit(committableList);
+        committableMap.keySet().forEach(this::getStoreCommitter);
+
+        long checkpointId = committables.get(0).checkpointId();
+        long watermark = committables.get(0).watermark();
+        for (Map.Entry<Identifier, StoreCommitter> entry : 
tableCommitters.entrySet()) {
+            List<ManifestCommittable> committableList = 
committableMap.get(entry.getKey());
+            StoreCommitter committer = entry.getValue();
+            if (committableList != null) {
+                committer.commit(committableList);
+            } else {
+                // try best to commit empty snapshot, but tableCommitters may 
not contain all tables
+                if (committer.forceCreatingSnapshot()) {
+                    ManifestCommittable combine =
+                            committer.combine(checkpointId, watermark, 
Collections.emptyList());
+                    committer.commit(Collections.singletonList(combine));
+                }
+            }
         }
     }
 
@@ -138,7 +158,7 @@ public class StoreMultiCommitter
                 .flatMap(
                         wrapped -> {
                             Map<Identifier, ManifestCommittable> 
manifestCommittables =
-                                    wrapped.getManifestCommittables();
+                                    wrapped.manifestCommittables();
                             return manifestCommittables.entrySet().stream()
                                     .map(entry -> Tuple2.of(entry.getKey(), 
entry.getValue()));
                         })
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
index 65eb12a13..324452ce5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
@@ -54,8 +54,11 @@ public class WrappedManifestCommittableSerializer
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
 
+        view.writeLong(wrapped.checkpointId());
+        view.writeLong(wrapped.watermark());
+
         // Serialize ManifestCommittable map inside WrappedManifestCommittable
-        Map<Identifier, ManifestCommittable> map = 
wrapped.getManifestCommittables();
+        Map<Identifier, ManifestCommittable> map = 
wrapped.manifestCommittables();
         view.writeInt(map.size());
         for (Map.Entry<Identifier, ManifestCommittable> entry : 
map.entrySet()) {
             byte[] serializedKey = 
entry.getKey().getFullName().getBytes(StandardCharsets.UTF_8);
@@ -83,9 +86,13 @@ public class WrappedManifestCommittableSerializer
 
         DataInputDeserializer view = new DataInputDeserializer(serialized);
 
+        long checkpointId = view.readLong();
+        long watermark = view.readLong();
+
         // Deserialize ManifestCommittable map inside 
WrappedManifestCommittable
         int mapSize = view.readInt();
-        WrappedManifestCommittable wrappedManifestCommittable = new 
WrappedManifestCommittable();
+        WrappedManifestCommittable wrappedManifestCommittable =
+                new WrappedManifestCommittable(checkpointId, watermark);
         for (int i = 0; i < mapSize; i++) {
             int keyLength = view.readInt();
             byte[] serializedKey = new byte[keyLength];
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 9aa1fb2d6..8a9bf96e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
@@ -332,6 +334,46 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
     }
 
+    @Test
+    public void testEmptyCommit() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table);
+        testHarness.open();
+
+        testHarness.snapshot(1, 1);
+        testHarness.notifyOfCompletedCheckpoint(1);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNull();
+    }
+
+    @Test
+    public void testEmptyCommitWithProcessTimeTag() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options ->
+                                options.set(
+                                        CoreOptions.TAG_AUTOMATIC_CREATION,
+                                        
CoreOptions.TagCreationMode.PROCESS_TIME));
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createRecoverableTestHarness(table);
+        testHarness.open();
+
+        testHarness.snapshot(1, 1);
+        testHarness.notifyOfCompletedCheckpoint(1);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.id()).isEqualTo(1);
+
+        testHarness.snapshot(2, 2);
+        testHarness.notifyOfCompletedCheckpoint(2);
+        snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.id()).isEqualTo(2);
+    }
+
     // ------------------------------------------------------------------------
     //  Metrics tests
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index c383ff063..d9454f736 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -87,8 +88,13 @@ public abstract class CommitterOperatorTestBase {
     }
 
     protected FileStoreTable createFileStoreTable() throws Exception {
+        return createFileStoreTable(options -> {});
+    }
+
+    protected FileStoreTable createFileStoreTable(Consumer<Options> 
setOptions) throws Exception {
         Options conf = new Options();
         conf.set(CoreOptions.PATH, tablePath.toString());
+        setOptions.accept(conf);
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
         schemaManager.createTable(
                 new Schema(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index aa3a8229b..8d2666d58 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -66,24 +67,26 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
 class StoreMultiCommitterTest {
+
     private String initialCommitUser;
     private Path warehouse;
     private Catalog.Loader catalogLoader;
     private Catalog catalog;
-    private String databaseName;
     private Identifier firstTable;
     private Identifier secondTable;
     private Path firstTablePath;
     private Path secondTablePath;
     @TempDir public java.nio.file.Path tempDir;
 
-    private void createTestTables(Catalog catalog, Tuple2<Identifier, 
Schema>... tableSpecs)
+    @SafeVarargs
+    private final void createTestTables(Catalog catalog, Tuple2<Identifier, 
Schema>... tableSpecs)
             throws Exception {
         for (Tuple2<Identifier, Schema> spec : tableSpecs) {
             catalog.createTable(spec.f0, spec.f1, false);
@@ -94,7 +97,7 @@ class StoreMultiCommitterTest {
     public void beforeEach() throws Exception {
         initialCommitUser = UUID.randomUUID().toString();
         warehouse = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
-        databaseName = "test_db";
+        String databaseName = "test_db";
         firstTable = Identifier.create(databaseName, "test_table1");
         secondTable = Identifier.create(databaseName, "test_table2");
 
@@ -112,21 +115,24 @@ class StoreMultiCommitterTest {
                             DataTypes.INT(), DataTypes.DOUBLE(), 
DataTypes.VARCHAR(5),
                         },
                         new String[] {"a", "b", "c"});
-        Options conf = new Options();
 
+        Options firstOptions = new Options();
+        firstOptions.set(
+                CoreOptions.TAG_AUTOMATIC_CREATION, 
CoreOptions.TagCreationMode.PROCESS_TIME);
         Schema firstTableSchema =
                 new Schema(
                         rowType1.getFields(),
                         Collections.emptyList(),
                         Collections.emptyList(),
-                        conf.toMap(),
+                        firstOptions.toMap(),
                         "");
+
         Schema secondTableSchema =
                 new Schema(
                         rowType2.getFields(),
                         Collections.emptyList(),
                         Collections.emptyList(),
-                        conf.toMap(),
+                        Collections.emptyMap(),
                         "");
         createTestTables(
                 catalog,
@@ -140,6 +146,7 @@ class StoreMultiCommitterTest {
     //  Recoverable operator tests
     // ------------------------------------------------------------------------
 
+    @SuppressWarnings("CatchMayIgnoreException")
     @Test
     public void testFailIntentionallyAfterRestore() throws Exception {
         FileStoreTable table = (FileStoreTable) catalog.getTable(firstTable);
@@ -204,7 +211,7 @@ class StoreMultiCommitterTest {
 
         // test restore and fail for second table
         // checkpoint is completed but not notified, so no snapshot is 
committed
-        snapshot = testHarness.snapshot(1, timestamp++);
+        snapshot = testHarness.snapshot(1, timestamp);
         assertThat(table.snapshotManager().latestSnapshotId()).isNull();
         testHarness.close();
 
@@ -396,7 +403,7 @@ class StoreMultiCommitterTest {
                             secondTable, new Committable(2, 
Committable.Kind.FILE, committable)),
                     timestamp++);
         }
-        testHarness.snapshot(3, timestamp++);
+        testHarness.snapshot(3, timestamp);
         testHarness.notifyOfCompletedCheckpoint(3);
 
         write1.close();
@@ -437,7 +444,8 @@ class StoreMultiCommitterTest {
         testHarness.processWatermark(new Watermark(1024));
         testHarness.snapshot(cpId, timestamp++);
         testHarness.notifyOfCompletedCheckpoint(cpId);
-        
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
+        
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+                .isEqualTo(1024L);
         assertThat(table2.snapshotManager().latestSnapshot()).isNull();
 
         // write to both tables on second watermark
@@ -453,11 +461,69 @@ class StoreMultiCommitterTest {
                                 write1.prepareCommit(true, cpId).get(0))),
                 timestamp++);
         testHarness.processWatermark(new Watermark(2048));
+        testHarness.snapshot(cpId, timestamp);
+        testHarness.notifyOfCompletedCheckpoint(cpId);
+        testHarness.close();
+        
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+                .isEqualTo(2048L);
+        
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+                .isEqualTo(2048L);
+    }
+
+    @Test
+    public void testEmptyCommit() throws Exception {
+        // table1: TagCreationMode.PROCESS_TIME
+        FileStoreTable table1 = (FileStoreTable) catalog.getTable(firstTable);
+        FileStoreTable table2 = (FileStoreTable) catalog.getTable(secondTable);
+
+        StreamTableWrite write1 =
+                
table1.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+        StreamTableWrite write2 =
+                
table2.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+        OneInputStreamOperatorTestHarness<MultiTableCommittable, 
MultiTableCommittable>
+                testHarness = createRecoverableTestHarness();
+        testHarness.open();
+
+        // write to both tables
+        long timestamp = 0;
+        long cpId = 1;
+        write1.write(GenericRow.of(1, 20L));
+        write2.write(GenericRow.of(1, 20.0, BinaryString.fromString("s2")));
+        testHarness.processElement(
+                getMultiTableCommittable(
+                        firstTable,
+                        new Committable(
+                                cpId,
+                                Committable.Kind.FILE,
+                                write1.prepareCommit(true, cpId).get(0))),
+                timestamp++);
+        testHarness.processElement(
+                getMultiTableCommittable(
+                        secondTable,
+                        new Committable(
+                                cpId,
+                                Committable.Kind.FILE,
+                                write2.prepareCommit(true, cpId).get(0))),
+                timestamp++);
+        testHarness.processWatermark(new Watermark(2048));
         testHarness.snapshot(cpId, timestamp++);
         testHarness.notifyOfCompletedCheckpoint(cpId);
+        
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+                .isEqualTo(2048L);
+        
assertThat(Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).watermark())
+                .isEqualTo(2048L);
+
+        cpId++;
+        testHarness.snapshot(cpId, timestamp);
+        testHarness.notifyOfCompletedCheckpoint(cpId);
+        
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).id())
+                .isEqualTo(2);
+        
assertThat(Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).id())
+                .isEqualTo(1);
+
         testHarness.close();
-        
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L);
-        
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L);
     }
 
     // ------------------------------------------------------------------------
@@ -503,7 +569,7 @@ class StoreMultiCommitterTest {
                                 Committable.Kind.FILE,
                                 write2.prepareCommit(true, cpId).get(0))),
                 timestamp++);
-        testHarness.snapshot(cpId, timestamp++);
+        testHarness.snapshot(cpId, timestamp);
         testHarness.notifyOfCompletedCheckpoint(cpId);
 
         OperatorMetricGroup operatorMetricGroup =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index 127b1e9fa..7c63dc56c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -51,7 +51,8 @@ class WrappedManifestCommittableSerializerTest {
         WrappedManifestCommittableSerializer serializer = serializer();
         ManifestCommittable committable1 = createManifestCommittable();
         ManifestCommittable committable2 = createManifestCommittable();
-        WrappedManifestCommittable wrappedManifestCommittable = new 
WrappedManifestCommittable();
+        WrappedManifestCommittable wrappedManifestCommittable =
+                new WrappedManifestCommittable(-1, -1);
         wrappedManifestCommittable.putManifestCommittable(
                 Identifier.create("db", "table1"), committable1);
         wrappedManifestCommittable.putManifestCommittable(
@@ -59,7 +60,7 @@ class WrappedManifestCommittableSerializerTest {
         byte[] serialized = serializer.serialize(wrappedManifestCommittable);
         WrappedManifestCommittable deserialize = 
serializer.deserialize(VERSION, serialized);
         Map<Identifier, ManifestCommittable> manifestCommittables =
-                deserialize.getManifestCommittables();
+                deserialize.manifestCommittables();
 
         assertThat(manifestCommittables.size()).isEqualTo(2);
         assertThat(deserialize).isEqualTo(wrappedManifestCommittable);

Reply via email to