This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 313f8b236 [core] Force creating snapshot for tag with auto process
time creation (#2188)
313f8b236 is described below
commit 313f8b2368f6e8d7948a1cf28117166a6ddb7b64
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Oct 30 20:04:17 2023 +0800
[core] Force creating snapshot for tag with auto process time creation
(#2188)
---
.../manifest/WrappedManifestCommittable.java | 90 +++++++++++-----------
.../apache/paimon/table/sink/TableCommitImpl.java | 4 +
.../org/apache/paimon/tag/TagAutoCreation.java | 4 +
.../org/apache/paimon/tag/TagTimeExtractor.java | 12 +++
.../manifest/WrappedManifestCommittableTest.java | 4 +-
.../org/apache/paimon/flink/sink/Committer.java | 2 +
.../paimon/flink/sink/CommitterOperator.java | 11 ++-
.../apache/paimon/flink/sink/StoreCommitter.java | 5 ++
.../paimon/flink/sink/StoreMultiCommitter.java | 36 +++++++--
.../sink/WrappedManifestCommittableSerializer.java | 11 ++-
.../paimon/flink/sink/CommitterOperatorTest.java | 42 ++++++++++
.../flink/sink/CommitterOperatorTestBase.java | 6 ++
.../paimon/flink/sink/StoreMultiCommitterTest.java | 90 +++++++++++++++++++---
.../WrappedManifestCommittableSerializerTest.java | 5 +-
14 files changed, 248 insertions(+), 74 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
index 5b72f8d8c..c5e8527fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/WrappedManifestCommittable.java
@@ -28,15 +28,44 @@ import java.util.TreeMap;
/** Manifest commit message. */
public class WrappedManifestCommittable {
- private Map<Identifier, ManifestCommittable> manifestCommittables;
+ private final long checkpointId;
- public WrappedManifestCommittable() {
+ private final long watermark;
+
+ private final Map<Identifier, ManifestCommittable> manifestCommittables;
+
+ public WrappedManifestCommittable(long checkpointId, long watermark) {
+ this.checkpointId = checkpointId;
+ this.watermark = watermark;
this.manifestCommittables =
new TreeMap<>(
Comparator.comparing(Identifier::getDatabaseName)
.thenComparing(Identifier::getObjectName));
}
+ public long checkpointId() {
+ return checkpointId;
+ }
+
+ public long watermark() {
+ return watermark;
+ }
+
+ public Map<Identifier, ManifestCommittable> manifestCommittables() {
+ return manifestCommittables;
+ }
+
+ public ManifestCommittable computeCommittableIfAbsent(
+ Identifier identifier, long checkpointId, long watermark) {
+ return manifestCommittables.computeIfAbsent(
+ identifier, id -> new ManifestCommittable(checkpointId,
watermark));
+ }
+
+ public void putManifestCommittable(
+ Identifier identifier, ManifestCommittable manifestCommittable) {
+ manifestCommittables.put(identifier, manifestCommittable);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -46,58 +75,25 @@ public class WrappedManifestCommittable {
return false;
}
WrappedManifestCommittable that = (WrappedManifestCommittable) o;
-
- if (manifestCommittables.size() != that.manifestCommittables.size()) {
- return false;
- }
-
- for (Map.Entry<Identifier, ManifestCommittable> entry :
manifestCommittables.entrySet()) {
- if (!Objects.equals(entry.getValue(),
that.manifestCommittables.get(entry.getKey()))) {
- return false;
- }
- }
-
- return true;
+ return checkpointId == that.checkpointId
+ && watermark == that.watermark
+ && Objects.equals(manifestCommittables,
that.manifestCommittables);
}
@Override
public int hashCode() {
- return Objects.hash(manifestCommittables.values().toArray(new
Object[0]));
+ return Objects.hash(checkpointId, watermark, manifestCommittables);
}
@Override
public String toString() {
- return String.format(
- "WrappedManifestCommittable {" + "manifestCommittables = %s",
- formatManifestCommittables());
- }
-
- private String formatManifestCommittables() {
- StringBuilder sb = new StringBuilder();
- sb.append("{");
- for (Identifier id : manifestCommittables.keySet()) {
- ManifestCommittable committable = manifestCommittables.get(id);
- sb.append(String.format("%s=%s, ", id.getFullName(),
committable.toString()));
- }
- if (manifestCommittables.size() > 0) {
- sb.delete(sb.length() - 2, sb.length());
- }
- sb.append("}");
- return sb.toString();
- }
-
- public ManifestCommittable computeCommittableIfAbsent(
- Identifier identifier, long checkpointId, long watermark) {
- return manifestCommittables.computeIfAbsent(
- identifier, id -> new ManifestCommittable(checkpointId,
watermark));
- }
-
- public ManifestCommittable putManifestCommittable(
- Identifier identifier, ManifestCommittable manifestCommittable) {
- return manifestCommittables.put(identifier, manifestCommittable);
- }
-
- public Map<Identifier, ManifestCommittable> getManifestCommittables() {
- return manifestCommittables;
+ return "WrappedManifestCommittable{"
+ + "checkpointId="
+ + checkpointId
+ + ", watermark="
+ + watermark
+ + ", manifestCommittables="
+ + manifestCommittables
+ + '}';
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 3d6dc98d7..8fe553e96 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -119,6 +119,10 @@ public class TableCommitImpl implements InnerTableCommit {
this.tableName = tableName;
}
+ public boolean forceCreatingSnapshot() {
+ return tagAutoCreation != null &&
tagAutoCreation.forceCreatingSnapshot();
+ }
+
@Override
public TableCommitImpl withOverwrite(@Nullable Map<String, String>
overwritePartitions) {
this.overwritePartition = overwritePartitions;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 6ec7f1cf2..e4a8e47e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -80,6 +80,10 @@ public class TagAutoCreation {
}
}
+ public boolean forceCreatingSnapshot() {
+ return timeExtractor.forceCreatingSnapshot();
+ }
+
public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index dadfde3a7..c7315f6b4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -33,6 +33,8 @@ public interface TagTimeExtractor {
Optional<LocalDateTime> extract(long timeMilli, @Nullable Long watermark);
+ boolean forceCreatingSnapshot();
+
/** Extract time from snapshot time millis. */
class ProcessTimeExtractor implements TagTimeExtractor {
@@ -43,6 +45,11 @@ public interface TagTimeExtractor {
.atZone(ZoneId.systemDefault())
.toLocalDateTime());
}
+
+ @Override
+ public boolean forceCreatingSnapshot() {
+ return true;
+ }
}
/** Extract time from snapshot watermark. */
@@ -63,6 +70,11 @@ public interface TagTimeExtractor {
return Optional.of(
Instant.ofEpochMilli(watermark).atZone(watermarkZoneId).toLocalDateTime());
}
+
+ @Override
+ public boolean forceCreatingSnapshot() {
+ return false;
+ }
}
@Nullable
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
index 67ec4c9c7..7eef1b427 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/WrappedManifestCommittableTest.java
@@ -32,12 +32,12 @@ class WrappedManifestCommittableTest {
ManifestCommittable committable1 = create();
ManifestCommittable committable2 = create();
- WrappedManifestCommittable wrapped1 = new WrappedManifestCommittable();
+ WrappedManifestCommittable wrapped1 = new
WrappedManifestCommittable(-1, -1);
wrapped1.putManifestCommittable(Identifier.create("db", "table1"),
committable1);
wrapped1.putManifestCommittable(Identifier.create("db", "table2"),
committable2);
// add manifest committables in reverse order
- WrappedManifestCommittable wrapped2 = new WrappedManifestCommittable();
+ WrappedManifestCommittable wrapped2 = new
WrappedManifestCommittable(-1, -1);
wrapped2.putManifestCommittable(Identifier.create("db", "table2"),
committable2);
wrapped2.putManifestCommittable(Identifier.create("db", "table1"),
committable1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index c80fc39e0..a58d97e4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -35,6 +35,8 @@ import java.util.Map;
*/
public interface Committer<CommitT, GlobalCommitT> extends AutoCloseable {
+ boolean forceCreatingSnapshot();
+
/** Compute an aggregated committable from a list of committables. */
GlobalCommitT combine(long checkpointId, long watermark, List<CommitT>
committables)
throws IOException;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index b27bb3bb9..16563cb77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -28,6 +28,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
@@ -154,8 +155,16 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- committer.commit(committables(headMap));
+ List<GlobalCommitT> committables = committables(headMap);
+ committer.commit(committables);
headMap.clear();
+
+ if (committables.isEmpty()) {
+ if (committer.forceCreatingSnapshot()) {
+ GlobalCommitT commit = toCommittables(checkpointId,
Collections.emptyList());
+ committer.commit(Collections.singletonList(commit));
+ }
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 1025fa930..7857242bd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -60,6 +60,11 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
return committerMetrics;
}
+ @Override
+ public boolean forceCreatingSnapshot() {
+ return commit.forceCreatingSnapshot();
+ }
+
@Override
public ManifestCommittable combine(
long checkpointId, long watermark, List<Committable> committables)
{
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index ad420de68..089311208 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -79,10 +79,16 @@ public class StoreMultiCommitter
this.isCompactJob = isCompactJob;
}
+ @Override
+ public boolean forceCreatingSnapshot() {
+ return true;
+ }
+
@Override
public WrappedManifestCommittable combine(
long checkpointId, long watermark, List<MultiTableCommittable>
committables) {
- WrappedManifestCommittable wrappedManifestCommittable = new
WrappedManifestCommittable();
+ WrappedManifestCommittable wrappedManifestCommittable =
+ new WrappedManifestCommittable(checkpointId, watermark);
for (MultiTableCommittable committable : committables) {
ManifestCommittable manifestCommittable =
@@ -109,15 +115,29 @@ public class StoreMultiCommitter
@Override
public void commit(List<WrappedManifestCommittable> committables)
throws IOException, InterruptedException {
+ if (committables.isEmpty()) {
+ return;
+ }
// key by table id
Map<Identifier, List<ManifestCommittable>> committableMap =
groupByTable(committables);
-
- for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
committableMap.entrySet()) {
- Identifier tableId = entry.getKey();
- List<ManifestCommittable> committableList = entry.getValue();
- StoreCommitter committer = getStoreCommitter(tableId);
- committer.commit(committableList);
+ committableMap.keySet().forEach(this::getStoreCommitter);
+
+ long checkpointId = committables.get(0).checkpointId();
+ long watermark = committables.get(0).watermark();
+ for (Map.Entry<Identifier, StoreCommitter> entry :
tableCommitters.entrySet()) {
+ List<ManifestCommittable> committableList =
committableMap.get(entry.getKey());
+ StoreCommitter committer = entry.getValue();
+ if (committableList != null) {
+ committer.commit(committableList);
+ } else {
+ // try best to commit empty snapshot, but tableCommitters may
not contain all tables
+ if (committer.forceCreatingSnapshot()) {
+ ManifestCommittable combine =
+ committer.combine(checkpointId, watermark,
Collections.emptyList());
+ committer.commit(Collections.singletonList(combine));
+ }
+ }
}
}
@@ -138,7 +158,7 @@ public class StoreMultiCommitter
.flatMap(
wrapped -> {
Map<Identifier, ManifestCommittable>
manifestCommittables =
- wrapped.getManifestCommittables();
+ wrapped.manifestCommittables();
return manifestCommittables.entrySet().stream()
.map(entry -> Tuple2.of(entry.getKey(),
entry.getValue()));
})
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
index 65eb12a13..324452ce5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java
@@ -54,8 +54,11 @@ public class WrappedManifestCommittableSerializer
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ view.writeLong(wrapped.checkpointId());
+ view.writeLong(wrapped.watermark());
+
// Serialize ManifestCommittable map inside WrappedManifestCommittable
- Map<Identifier, ManifestCommittable> map =
wrapped.getManifestCommittables();
+ Map<Identifier, ManifestCommittable> map =
wrapped.manifestCommittables();
view.writeInt(map.size());
for (Map.Entry<Identifier, ManifestCommittable> entry :
map.entrySet()) {
byte[] serializedKey =
entry.getKey().getFullName().getBytes(StandardCharsets.UTF_8);
@@ -83,9 +86,13 @@ public class WrappedManifestCommittableSerializer
DataInputDeserializer view = new DataInputDeserializer(serialized);
+ long checkpointId = view.readLong();
+ long watermark = view.readLong();
+
// Deserialize ManifestCommittable map inside
WrappedManifestCommittable
int mapSize = view.readInt();
- WrappedManifestCommittable wrappedManifestCommittable = new
WrappedManifestCommittable();
+ WrappedManifestCommittable wrappedManifestCommittable =
+ new WrappedManifestCommittable(checkpointId, watermark);
for (int i = 0; i < mapSize; i++) {
int keyLength = view.readInt();
byte[] serializedKey = new byte[keyLength];
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 9aa1fb2d6..8a9bf96e2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
@@ -332,6 +334,46 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
}
+ @Test
+ public void testEmptyCommit() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table);
+ testHarness.open();
+
+ testHarness.snapshot(1, 1);
+ testHarness.notifyOfCompletedCheckpoint(1);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNull();
+ }
+
+ @Test
+ public void testEmptyCommitWithProcessTimeTag() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options ->
+ options.set(
+ CoreOptions.TAG_AUTOMATIC_CREATION,
+
CoreOptions.TagCreationMode.PROCESS_TIME));
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createRecoverableTestHarness(table);
+ testHarness.open();
+
+ testHarness.snapshot(1, 1);
+ testHarness.notifyOfCompletedCheckpoint(1);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.id()).isEqualTo(1);
+
+ testHarness.snapshot(2, 2);
+ testHarness.notifyOfCompletedCheckpoint(2);
+ snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.id()).isEqualTo(2);
+ }
+
// ------------------------------------------------------------------------
// Metrics tests
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
index c383ff063..d9454f736 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTestBase.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -87,8 +88,13 @@ public abstract class CommitterOperatorTestBase {
}
protected FileStoreTable createFileStoreTable() throws Exception {
+ return createFileStoreTable(options -> {});
+ }
+
+ protected FileStoreTable createFileStoreTable(Consumer<Options>
setOptions) throws Exception {
Options conf = new Options();
conf.set(CoreOptions.PATH, tablePath.toString());
+ setOptions.accept(conf);
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
schemaManager.createTable(
new Schema(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index aa3a8229b..8d2666d58 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -66,24 +67,26 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
class StoreMultiCommitterTest {
+
private String initialCommitUser;
private Path warehouse;
private Catalog.Loader catalogLoader;
private Catalog catalog;
- private String databaseName;
private Identifier firstTable;
private Identifier secondTable;
private Path firstTablePath;
private Path secondTablePath;
@TempDir public java.nio.file.Path tempDir;
- private void createTestTables(Catalog catalog, Tuple2<Identifier,
Schema>... tableSpecs)
+ @SafeVarargs
+ private final void createTestTables(Catalog catalog, Tuple2<Identifier,
Schema>... tableSpecs)
throws Exception {
for (Tuple2<Identifier, Schema> spec : tableSpecs) {
catalog.createTable(spec.f0, spec.f1, false);
@@ -94,7 +97,7 @@ class StoreMultiCommitterTest {
public void beforeEach() throws Exception {
initialCommitUser = UUID.randomUUID().toString();
warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
- databaseName = "test_db";
+ String databaseName = "test_db";
firstTable = Identifier.create(databaseName, "test_table1");
secondTable = Identifier.create(databaseName, "test_table2");
@@ -112,21 +115,24 @@ class StoreMultiCommitterTest {
DataTypes.INT(), DataTypes.DOUBLE(),
DataTypes.VARCHAR(5),
},
new String[] {"a", "b", "c"});
- Options conf = new Options();
+ Options firstOptions = new Options();
+ firstOptions.set(
+ CoreOptions.TAG_AUTOMATIC_CREATION,
CoreOptions.TagCreationMode.PROCESS_TIME);
Schema firstTableSchema =
new Schema(
rowType1.getFields(),
Collections.emptyList(),
Collections.emptyList(),
- conf.toMap(),
+ firstOptions.toMap(),
"");
+
Schema secondTableSchema =
new Schema(
rowType2.getFields(),
Collections.emptyList(),
Collections.emptyList(),
- conf.toMap(),
+ Collections.emptyMap(),
"");
createTestTables(
catalog,
@@ -140,6 +146,7 @@ class StoreMultiCommitterTest {
// Recoverable operator tests
// ------------------------------------------------------------------------
+ @SuppressWarnings("CatchMayIgnoreException")
@Test
public void testFailIntentionallyAfterRestore() throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(firstTable);
@@ -204,7 +211,7 @@ class StoreMultiCommitterTest {
// test restore and fail for second table
// checkpoint is completed but not notified, so no snapshot is
committed
- snapshot = testHarness.snapshot(1, timestamp++);
+ snapshot = testHarness.snapshot(1, timestamp);
assertThat(table.snapshotManager().latestSnapshotId()).isNull();
testHarness.close();
@@ -396,7 +403,7 @@ class StoreMultiCommitterTest {
secondTable, new Committable(2,
Committable.Kind.FILE, committable)),
timestamp++);
}
- testHarness.snapshot(3, timestamp++);
+ testHarness.snapshot(3, timestamp);
testHarness.notifyOfCompletedCheckpoint(3);
write1.close();
@@ -437,7 +444,8 @@ class StoreMultiCommitterTest {
testHarness.processWatermark(new Watermark(1024));
testHarness.snapshot(cpId, timestamp++);
testHarness.notifyOfCompletedCheckpoint(cpId);
-
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
+
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+ .isEqualTo(1024L);
assertThat(table2.snapshotManager().latestSnapshot()).isNull();
// write to both tables on second watermark
@@ -453,11 +461,69 @@ class StoreMultiCommitterTest {
write1.prepareCommit(true, cpId).get(0))),
timestamp++);
testHarness.processWatermark(new Watermark(2048));
+ testHarness.snapshot(cpId, timestamp);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+ testHarness.close();
+
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+ .isEqualTo(2048L);
+
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+ .isEqualTo(2048L);
+ }
+
+ @Test
+ public void testEmptyCommit() throws Exception {
+ // table1: TagCreationMode.PROCESS_TIME
+ FileStoreTable table1 = (FileStoreTable) catalog.getTable(firstTable);
+ FileStoreTable table2 = (FileStoreTable) catalog.getTable(secondTable);
+
+ StreamTableWrite write1 =
+
table1.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ StreamTableWrite write2 =
+
table2.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ OneInputStreamOperatorTestHarness<MultiTableCommittable,
MultiTableCommittable>
+ testHarness = createRecoverableTestHarness();
+ testHarness.open();
+
+ // write to both tables
+ long timestamp = 0;
+ long cpId = 1;
+ write1.write(GenericRow.of(1, 20L));
+ write2.write(GenericRow.of(1, 20.0, BinaryString.fromString("s2")));
+ testHarness.processElement(
+ getMultiTableCommittable(
+ firstTable,
+ new Committable(
+ cpId,
+ Committable.Kind.FILE,
+ write1.prepareCommit(true, cpId).get(0))),
+ timestamp++);
+ testHarness.processElement(
+ getMultiTableCommittable(
+ secondTable,
+ new Committable(
+ cpId,
+ Committable.Kind.FILE,
+ write2.prepareCommit(true, cpId).get(0))),
+ timestamp++);
+ testHarness.processWatermark(new Watermark(2048));
testHarness.snapshot(cpId, timestamp++);
testHarness.notifyOfCompletedCheckpoint(cpId);
+
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).watermark())
+ .isEqualTo(2048L);
+
assertThat(Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).watermark())
+ .isEqualTo(2048L);
+
+ cpId++;
+ testHarness.snapshot(cpId, timestamp);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+
assertThat(Objects.requireNonNull(table1.snapshotManager().latestSnapshot()).id())
+ .isEqualTo(2);
+
assertThat(Objects.requireNonNull(table2.snapshotManager().latestSnapshot()).id())
+ .isEqualTo(1);
+
testHarness.close();
-
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L);
-
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L);
}
// ------------------------------------------------------------------------
@@ -503,7 +569,7 @@ class StoreMultiCommitterTest {
Committable.Kind.FILE,
write2.prepareCommit(true, cpId).get(0))),
timestamp++);
- testHarness.snapshot(cpId, timestamp++);
+ testHarness.snapshot(cpId, timestamp);
testHarness.notifyOfCompletedCheckpoint(cpId);
OperatorMetricGroup operatorMetricGroup =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index 127b1e9fa..7c63dc56c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -51,7 +51,8 @@ class WrappedManifestCommittableSerializerTest {
WrappedManifestCommittableSerializer serializer = serializer();
ManifestCommittable committable1 = createManifestCommittable();
ManifestCommittable committable2 = createManifestCommittable();
- WrappedManifestCommittable wrappedManifestCommittable = new
WrappedManifestCommittable();
+ WrappedManifestCommittable wrappedManifestCommittable =
+ new WrappedManifestCommittable(-1, -1);
wrappedManifestCommittable.putManifestCommittable(
Identifier.create("db", "table1"), committable1);
wrappedManifestCommittable.putManifestCommittable(
@@ -59,7 +60,7 @@ class WrappedManifestCommittableSerializerTest {
byte[] serialized = serializer.serialize(wrappedManifestCommittable);
WrappedManifestCommittable deserialize =
serializer.deserialize(VERSION, serialized);
Map<Identifier, ManifestCommittable> manifestCommittables =
- deserialize.getManifestCommittables();
+ deserialize.manifestCommittables();
assertThat(manifestCommittables.size()).isEqualTo(2);
assertThat(deserialize).isEqualTo(wrappedManifestCommittable);