This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7599a41 [FLINK-26184] Migrate StoreSink to Sink V2
7599a41 is described below
commit 7599a41955bf4931f3dc35273336829e2cc942f4
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Feb 17 19:34:42 2022 +0800
[FLINK-26184] Migrate StoreSink to Sink V2
This closes #22
---
flink-table-store-connector/pom.xml | 8 +
.../table/store/connector/sink/Committable.java | 79 ++++++
.../connector/sink/CommittableSerializer.java | 54 ++++
...{LocalCommittable.java => FileCommittable.java} | 8 +-
...ializer.java => FileCommittableSerializer.java} | 12 +-
.../store/connector/sink/GlobalCommittable.java | 44 ++++
.../sink/GlobalCommittableSerializer.java | 94 +++++++
.../store/connector/sink/LogOffsetCommittable.java | 51 ++++
.../store/connector/sink/StoreGlobalCommitter.java | 94 ++++---
.../table/store/connector/sink/StoreSink.java | 79 +++---
.../store/connector/sink/StoreSinkWriter.java | 63 +++--
.../connector/sink/global/GlobalCommitter.java | 45 ++++
.../sink/global/GlobalCommitterOperator.java | 152 +++++++++++
.../sink/global/GlobalCommittingSink.java | 56 +++++
.../global/GlobalCommittingSinkTranslator.java | 60 +++++
.../connector/sink/CommittableSerializerTest.java | 38 +++
...est.java => FileCommittableSerializerTest.java} | 12 +-
.../sink/GlobalCommittableSerializerTest.java | 69 +++++
.../connector/sink/LogOffsetCommittableTest.java | 35 +++
.../table/store/connector/sink/StoreSinkTest.java | 35 ++-
.../table/store/connector/sink/TestFileStore.java | 18 +-
.../sink/global/GlobalCommitterOperatorTest.java | 278 +++++++++++++++++++++
.../apache/flink/table/store/file/Snapshot.java | 16 +-
.../store/file/manifest/ManifestCommittable.java | 58 +++--
.../manifest/ManifestCommittableSerializer.java | 22 +-
.../store/file/operation/FileStoreCommitImpl.java | 42 ++--
.../ManifestCommittableSerializerTest.java | 34 ++-
.../store/file/operation/OperationTestUtils.java | 7 +-
.../store/file/operation/TestCommitThread.java | 11 +-
29 files changed, 1394 insertions(+), 180 deletions(-)
diff --git a/flink-table-store-connector/pom.xml
b/flink-table-store-connector/pom.xml
index a578b80..70ccea3 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -80,6 +80,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
new file mode 100644
index 0000000..b4ba992
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+/** Committable produced by {@link StoreSinkWriter}. */
+public class Committable {
+
+ private final Kind kind;
+
+ private final byte[] wrappedCommittable;
+
+ private final int serializerVersion;
+
+ public Committable(Kind kind, byte[] wrappedCommittable, int
serializerVersion) {
+ this.kind = kind;
+ this.wrappedCommittable = wrappedCommittable;
+ this.serializerVersion = serializerVersion;
+ }
+
+ public Kind kind() {
+ return kind;
+ }
+
+ public byte[] wrappedCommittable() {
+ return wrappedCommittable;
+ }
+
+ public int serializerVersion() {
+ return serializerVersion;
+ }
+
+ enum Kind {
+ FILE((byte) 0),
+
+ LOG((byte) 1),
+
+ LOG_OFFSET((byte) 2);
+
+ private final byte value;
+
+ Kind(byte value) {
+ this.value = value;
+ }
+
+ public byte toByteValue() {
+ return value;
+ }
+
+ public static Kind fromByteValue(byte value) {
+ switch (value) {
+ case 0:
+ return FILE;
+ case 1:
+ return LOG;
+ case 2:
+ return LOG_OFFSET;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported byte value '" + value + "' for value
kind.");
+ }
+ }
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
new file mode 100644
index 0000000..82627ea
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link Committable}. */
+public class CommittableSerializer implements
SimpleVersionedSerializer<Committable> {
+
+ public static final CommittableSerializer INSTANCE = new
CommittableSerializer();
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(Committable committable) {
+ byte[] wrapped = committable.wrappedCommittable();
+ return ByteBuffer.allocate(1 + wrapped.length + 4)
+ .put(committable.kind().toByteValue())
+ .put(wrapped)
+ .putInt(committable.serializerVersion())
+ .array();
+ }
+
+ @Override
+ public Committable deserialize(int i, byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
+ byte[] wrapped = new byte[bytes.length - 5];
+ buffer.get(wrapped);
+ int version = buffer.getInt();
+ return new Committable(kind, wrapped, version);
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
similarity index 90%
rename from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
rename to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
index c6bbeca..e088d1d 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.store.file.mergetree.Increment;
import java.util.Objects;
-/** Local committable for sink. */
-public class LocalCommittable {
+/** File committable for sink. */
+public class FileCommittable {
private final BinaryRowData partition;
@@ -32,7 +32,7 @@ public class LocalCommittable {
private final Increment increment;
- public LocalCommittable(BinaryRowData partition, int bucket, Increment
increment) {
+ public FileCommittable(BinaryRowData partition, int bucket, Increment
increment) {
this.partition = partition;
this.bucket = bucket;
this.increment = increment;
@@ -58,7 +58,7 @@ public class LocalCommittable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- LocalCommittable that = (LocalCommittable) o;
+ FileCommittable that = (FileCommittable) o;
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
&& Objects.equals(increment, that.increment);
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
similarity index 85%
rename from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
rename to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index 6bf19ca..69bb44b 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -29,13 +29,13 @@ import org.apache.flink.table.types.logical.RowType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-/** {@link SimpleVersionedSerializer} for {@link LocalCommittable}. */
-public class LocalCommittableSerializer implements
SimpleVersionedSerializer<LocalCommittable> {
+/** {@link SimpleVersionedSerializer} for {@link FileCommittable}. */
+public class FileCommittableSerializer implements
SimpleVersionedSerializer<FileCommittable> {
private final BinaryRowDataSerializer partSerializer;
private final SstFileMetaSerializer sstSerializer;
- public LocalCommittableSerializer(RowType partitionType, RowType keyType,
RowType rowType) {
+ public FileCommittableSerializer(RowType partitionType, RowType keyType,
RowType rowType) {
this.partSerializer = new
BinaryRowDataSerializer(partitionType.getFieldCount());
this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
}
@@ -46,7 +46,7 @@ public class LocalCommittableSerializer implements
SimpleVersionedSerializer<Loc
}
@Override
- public byte[] serialize(LocalCommittable obj) throws IOException {
+ public byte[] serialize(FileCommittable obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
partSerializer.serialize(obj.partition(), view);
@@ -58,9 +58,9 @@ public class LocalCommittableSerializer implements
SimpleVersionedSerializer<Loc
}
@Override
- public LocalCommittable deserialize(int version, byte[] serialized) throws
IOException {
+ public FileCommittable deserialize(int version, byte[] serialized) throws
IOException {
DataInputDeserializer view = new DataInputDeserializer(serialized);
- return new LocalCommittable(
+ return new FileCommittable(
partSerializer.deserialize(view),
view.readInt(),
new Increment(
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
new file mode 100644
index 0000000..ac49d15
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/** Global aggregated committable for {@link StoreGlobalCommitter}. */
+public class GlobalCommittable<LogCommT> {
+
+ private final List<LogCommT> logCommittables;
+
+ private final ManifestCommittable fileCommittable;
+
+ public GlobalCommittable(List<LogCommT> logCommittables,
ManifestCommittable fileCommittable) {
+ this.logCommittables = logCommittables;
+ this.fileCommittable = fileCommittable;
+ }
+
+ public List<LogCommT> logCommittables() {
+ return logCommittables;
+ }
+
+ public ManifestCommittable fileCommittable() {
+ return fileCommittable;
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
new file mode 100644
index 0000000..ea180d2
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link SimpleVersionedSerializer} for {@link GlobalCommittable}. */
+public class GlobalCommittableSerializer<LogCommT>
+ implements SimpleVersionedSerializer<GlobalCommittable<LogCommT>> {
+
+ private final SimpleVersionedSerializer<LogCommT> logSerializer;
+
+ private final ManifestCommittableSerializer fileSerializer;
+
+ public GlobalCommittableSerializer(
+ SimpleVersionedSerializer<LogCommT> logSerializer,
+ ManifestCommittableSerializer fileSerializer) {
+ this.logSerializer = logSerializer;
+ this.fileSerializer = fileSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(GlobalCommittable<LogCommT> committable) throws
IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+
+ view.writeInt(logSerializer.getVersion());
+ view.writeInt(committable.logCommittables().size());
+ for (LogCommT commT : committable.logCommittables()) {
+ byte[] bytes = logSerializer.serialize(commT);
+ view.writeInt(bytes.length);
+ view.write(bytes);
+ }
+
+ view.writeInt(fileSerializer.getVersion());
+ byte[] bytes = fileSerializer.serialize(committable.fileCommittable());
+ view.writeInt(bytes.length);
+ view.write(bytes);
+
+ return out.toByteArray();
+ }
+
+ @Override
+ public GlobalCommittable<LogCommT> deserialize(int version, byte[]
serialized)
+ throws IOException {
+ DataInputDeserializer view = new DataInputDeserializer(serialized);
+
+ int logVersion = view.readInt();
+ int logSize = view.readInt();
+ List<LogCommT> logCommTList = new ArrayList<>();
+ for (int i = 0; i < logSize; i++) {
+ byte[] bytes = new byte[view.readInt()];
+ view.read(bytes);
+ logCommTList.add(logSerializer.deserialize(logVersion, bytes));
+ }
+
+ int fileVersion = view.readInt();
+ byte[] bytes = new byte[view.readInt()];
+ view.read(bytes);
+ ManifestCommittable file = fileSerializer.deserialize(fileVersion,
bytes);
+
+ return new GlobalCommittable<>(logCommTList, file);
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
new file mode 100644
index 0000000..68b62d8
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import java.nio.ByteBuffer;
+
+/** Log offset committable for a bucket. */
+public class LogOffsetCommittable {
+
+ private final int bucket;
+
+ private final long offset;
+
+ public LogOffsetCommittable(int bucket, long offset) {
+ this.bucket = bucket;
+ this.offset = offset;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public byte[] toBytes() {
+ return ByteBuffer.allocate(12).putInt(bucket).putLong(offset).array();
+ }
+
+ public static LogOffsetCommittable fromBytes(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ return new LogOffsetCommittable(buffer.getInt(), buffer.getLong());
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index e5dde92..a70b3b1 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -18,29 +18,31 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpire;
import javax.annotation.Nullable;
-import java.util.Collections;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
+import java.util.stream.Collectors;
/** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter
- implements GlobalCommitter<LocalCommittable, ManifestCommittable> {
+public class StoreGlobalCommitter<LogCommT>
+ implements GlobalCommitter<Committable, GlobalCommittable<LogCommT>> {
private final FileStoreCommit fileStoreCommit;
private final FileStoreExpire fileStoreExpire;
+ private final FileCommittableSerializer fileCommitSerializer;
+
@Nullable private final CatalogLock lock;
@Nullable private final Map<String, String> overwritePartition;
@@ -48,55 +50,77 @@ public class StoreGlobalCommitter
public StoreGlobalCommitter(
FileStoreCommit fileStoreCommit,
FileStoreExpire fileStoreExpire,
+ FileCommittableSerializer fileCommitSerializer,
@Nullable CatalogLock lock,
@Nullable Map<String, String> overwritePartition) {
this.fileStoreCommit = fileStoreCommit;
this.fileStoreExpire = fileStoreExpire;
+ this.fileCommitSerializer = fileCommitSerializer;
this.lock = lock;
this.overwritePartition = overwritePartition;
}
@Override
- public List<ManifestCommittable> filterRecoveredCommittables(
- List<ManifestCommittable> globalCommittables) {
- return fileStoreCommit.filterCommitted(globalCommittables);
+ public void close() throws Exception {
+ if (lock != null) {
+ lock.close();
+ }
}
@Override
- public ManifestCommittable combine(List<LocalCommittable> committables) {
- ManifestCommittable globalCommittable = new ManifestCommittable();
- committables.forEach(
- committable ->
- globalCommittable.add(
- committable.partition(),
- committable.bucket(),
- committable.increment()));
- return globalCommittable;
+ public List<GlobalCommittable<LogCommT>> filterRecoveredCommittables(
+ List<GlobalCommittable<LogCommT>> globalCommittables) {
+ List<ManifestCommittable> filtered =
+ fileStoreCommit.filterCommitted(
+ globalCommittables.stream()
+ .map(GlobalCommittable::fileCommittable)
+ .collect(Collectors.toList()));
+ return globalCommittables.stream()
+ .filter(c -> filtered.contains(c.fileCommittable()))
+ .collect(Collectors.toList());
}
@Override
- public List<ManifestCommittable> commit(List<ManifestCommittable>
globalCommittables) {
- Map<String, String> properties = new HashMap<>();
- if (overwritePartition == null) {
- for (ManifestCommittable committable : globalCommittables) {
- fileStoreCommit.commit(committable, properties);
+ public GlobalCommittable<LogCommT> combine(long checkpointId,
List<Committable> committables)
+ throws IOException {
+ List<LogCommT> logCommittables = new ArrayList<>();
+ ManifestCommittable fileCommittable = new
ManifestCommittable(String.valueOf(checkpointId));
+ for (Committable committable : committables) {
+ switch (committable.kind()) {
+ case FILE:
+ FileCommittable file =
+ fileCommitSerializer.deserialize(
+ committable.serializerVersion(),
+ committable.wrappedCommittable());
+ fileCommittable.addFileCommittable(
+ file.partition(), file.bucket(), file.increment());
+ break;
+ case LOG_OFFSET:
+ LogOffsetCommittable offset =
+
LogOffsetCommittable.fromBytes(committable.wrappedCommittable());
+ fileCommittable.addLogOffset(offset.bucket(),
offset.offset());
+ break;
+ case LOG:
+ throw new UnsupportedOperationException();
}
- } else {
- checkArgument(
- globalCommittables.size() == 1, "overwrite is only
supported in batch mode.");
- fileStoreCommit.overwrite(overwritePartition,
globalCommittables.get(0), properties);
}
- fileStoreExpire.expire();
- return Collections.emptyList();
+ return new GlobalCommittable<>(logCommittables, fileCommittable);
}
@Override
- public void endOfInput() {}
-
- @Override
- public void close() throws Exception {
- if (lock != null) {
- lock.close();
+ public void commit(List<GlobalCommittable<LogCommT>> committables) {
+ if (overwritePartition == null) {
+ for (GlobalCommittable<LogCommT> committable : committables) {
+ fileStoreCommit.commit(committable.fileCommittable(), new
HashMap<>());
+ }
+ } else {
+ for (GlobalCommittable<LogCommT> committable : committables) {
+ fileStoreCommit.overwrite(
+ overwritePartition, committable.fileCommittable(), new
HashMap<>());
+ }
}
+
+ // TODO introduce check interval
+ fileStoreExpire.expire();
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 8e7652d..c61ce0c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,15 +18,14 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.Lock;
@@ -35,15 +34,17 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
-import java.util.List;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.Callable;
import static org.apache.flink.table.store.utils.ProjectionUtils.project;
/** {@link Sink} of dynamic store. */
-public class StoreSink implements Sink<RowData, LocalCommittable, Void,
ManifestCommittable> {
+public class StoreSink<WriterStateT, LogCommT>
+ implements StatefulSink<RowData, WriterStateT>,
+ GlobalCommittingSink<RowData, Committable,
GlobalCommittable<LogCommT>> {
private static final long serialVersionUID = 1L;
@@ -83,26 +84,27 @@ public class StoreSink implements Sink<RowData,
LocalCommittable, Void, Manifest
}
@Override
- public StoreSinkWriter createWriter(InitContext initContext, List<Void>
list) {
- SinkRecordConverter recordConverter =
- new SinkRecordConverter(numBucket, rowType, partitions, keys);
- return new StoreSinkWriter(
- fileStore.newWrite(), recordConverter, overwritePartition !=
null);
+ public StoreSinkWriter<WriterStateT> createWriter(InitContext initContext)
throws IOException {
+ return restoreWriter(initContext, null);
}
@Override
- public Optional<SimpleVersionedSerializer<Void>>
getWriterStateSerializer() {
- return Optional.empty();
+ public StoreSinkWriter<WriterStateT> restoreWriter(
+ InitContext initContext, Collection<WriterStateT> states) throws
IOException {
+ return new StoreSinkWriter<>(
+ fileStore.newWrite(),
+ new SinkRecordConverter(numBucket, rowType, partitions, keys),
+ fileCommitSerializer(),
+ overwritePartition != null);
}
@Override
- public Optional<Committer<LocalCommittable>> createCommitter() {
- return Optional.empty();
+ public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
+ return new NoOutputSerializer<>();
}
@Override
- public Optional<GlobalCommitter<LocalCommittable, ManifestCommittable>>
- createGlobalCommitter() {
+ public StoreGlobalCommitter<LogCommT> createCommitter() throws IOException
{
FileStoreCommit commit = fileStore.newCommit();
CatalogLock lock;
if (lockFactory == null) {
@@ -120,22 +122,43 @@ public class StoreSink implements Sink<RowData,
LocalCommittable, Void, Manifest
}
});
}
- return Optional.of(
- new StoreGlobalCommitter(commit, fileStore.newExpire(), lock,
overwritePartition));
+
+ return new StoreGlobalCommitter<>(
+ commit, fileStore.newExpire(), fileCommitSerializer(), lock,
overwritePartition);
}
@Override
- public Optional<SimpleVersionedSerializer<LocalCommittable>>
getCommittableSerializer() {
- return Optional.of(
- new LocalCommittableSerializer(
- project(rowType, partitions), project(rowType, keys),
rowType));
+ public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
+ return CommittableSerializer.INSTANCE;
}
@Override
- public Optional<SimpleVersionedSerializer<ManifestCommittable>>
- getGlobalCommittableSerializer() {
- return Optional.of(
+ public GlobalCommittableSerializer<LogCommT>
getGlobalCommittableSerializer() {
+ ManifestCommittableSerializer fileCommSerializer =
new ManifestCommittableSerializer(
- project(rowType, partitions), project(rowType, keys),
rowType));
+ project(rowType, partitions), project(rowType, keys),
rowType);
+ SimpleVersionedSerializer<LogCommT> logCommitSerializer = new
NoOutputSerializer<>();
+ return new GlobalCommittableSerializer<>(logCommitSerializer,
fileCommSerializer);
+ }
+
+ private FileCommittableSerializer fileCommitSerializer() {
+ return new FileCommittableSerializer(
+ project(rowType, partitions), project(rowType, keys), rowType);
+ }
+
+ private static class NoOutputSerializer<T> implements
SimpleVersionedSerializer<T> {
+ private NoOutputSerializer() {}
+
+ public int getVersion() {
+ return 1;
+ }
+
+ public byte[] serialize(T obj) {
+ throw new IllegalStateException("Should not serialize anything");
+ }
+
+ public T deserialize(int version, byte[] serialized) {
+ throw new IllegalStateException("Should not deserialize anything");
+ }
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index f30a88b..8cc88a1 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -28,10 +30,10 @@ import
org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.flink.table.store.sink.SinkRecordConverter;
-import org.apache.flink.types.RowKind;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,12 +41,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** A {@link SinkWriter} for dynamic store. */
-public class StoreSinkWriter implements SinkWriter<RowData, LocalCommittable,
Void> {
+public class StoreSinkWriter<WriterStateT>
+ implements StatefulSinkWriter<RowData, WriterStateT>,
+ PrecommittingSinkWriter<RowData, Committable> {
private final FileStoreWrite fileStoreWrite;
private final SinkRecordConverter recordConverter;
+ private final FileCommittableSerializer fileCommitSerializer;
+
private final boolean overwrite;
private final ExecutorService compactExecutor;
@@ -52,9 +58,13 @@ public class StoreSinkWriter implements SinkWriter<RowData,
LocalCommittable, Vo
private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
public StoreSinkWriter(
- FileStoreWrite fileStoreWrite, SinkRecordConverter
recordConverter, boolean overwrite) {
+ FileStoreWrite fileStoreWrite,
+ SinkRecordConverter recordConverter,
+ FileCommittableSerializer fileCommitSerializer,
+ boolean overwrite) {
this.fileStoreWrite = fileStoreWrite;
this.recordConverter = recordConverter;
+ this.fileCommitSerializer = fileCommitSerializer;
this.overwrite = overwrite;
this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
this.writers = new HashMap<>();
@@ -76,8 +86,7 @@ public class StoreSinkWriter implements SinkWriter<RowData,
LocalCommittable, Vo
}
@Override
- public void write(RowData rowData, Context context) throws IOException {
- RowKind rowKind = rowData.getRowKind();
+ public void write(RowData rowData, Context context) throws IOException,
InterruptedException {
SinkRecord record = recordConverter.convert(rowData);
RecordWriter writer = getWriter(record.partition(), record.bucket());
try {
@@ -85,7 +94,6 @@ public class StoreSinkWriter implements SinkWriter<RowData,
LocalCommittable, Vo
} catch (Exception e) {
throw new IOException(e);
}
- rowData.setRowKind(rowKind);
}
private void writeToFileStore(RecordWriter writer, SinkRecord record)
throws Exception {
@@ -110,23 +118,31 @@ public class StoreSinkWriter implements
SinkWriter<RowData, LocalCommittable, Vo
}
@Override
- public List<LocalCommittable> prepareCommit(boolean flush) throws
IOException {
- try {
- return prepareCommit();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ public void flush(boolean endOfInput) {}
+
+ @Override
+ public List<WriterStateT> snapshotState(long checkpointId) throws
IOException {
+ return Collections.emptyList();
}
- private List<LocalCommittable> prepareCommit() throws Exception {
- List<LocalCommittable> committables = new ArrayList<>();
+ @Override
+ public List<Committable> prepareCommit() throws IOException {
+ List<Committable> committables = new ArrayList<>();
for (BinaryRowData partition : writers.keySet()) {
Map<Integer, RecordWriter> buckets = writers.get(partition);
for (Integer bucket : buckets.keySet()) {
RecordWriter writer = buckets.get(bucket);
- LocalCommittable committable =
- new LocalCommittable(partition, bucket,
writer.prepareCommit());
- committables.add(committable);
+ FileCommittable committable;
+ try {
+ committable = new FileCommittable(partition, bucket,
writer.prepareCommit());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ committables.add(
+ new Committable(
+ Committable.Kind.FILE,
+ fileCommitSerializer.serialize(committable),
+ fileCommitSerializer.getVersion()));
// clear if no update
// we need a mechanism to clear writers, otherwise there will
be more and more
@@ -141,12 +157,17 @@ public class StoreSinkWriter implements
SinkWriter<RowData, LocalCommittable, Vo
writers.remove(partition);
}
}
+
return committables;
}
- private void closeWriter(RecordWriter writer) throws Exception {
- writer.sync();
- writer.close();
+ private void closeWriter(RecordWriter writer) throws IOException {
+ try {
+ writer.sync();
+ writer.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
@Override
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
new file mode 100644
index 0000000..90eee7b
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The {@code GlobalCommitter} is responsible for creating and committing an
aggregated committable,
+ * which we call global committable (see {@link #combine}).
+ *
+ * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
+ *
+ * @param <CommT> The type of information needed to commit data staged by the
sink
+ * @param <GlobalCommT> The type of the aggregated committable
+ */
+public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
+
+ /** Find out which global committables need to be retried when recovering
from the failure. */
+ List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT>
globalCommittables)
+ throws IOException;
+
+ /** Compute an aggregated committable from a list of committables. */
+ GlobalCommT combine(long checkpointId, List<CommT> committables) throws
IOException;
+
+ /** Commits the given {@link GlobalCommT}. */
+ void commit(List<GlobalCommT> globalCommittables) throws IOException,
InterruptedException;
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
new file mode 100644
index 0000000..ec48d27
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An operator that processes committables of a {@link Sink}. */
+public class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator<Void>
+ implements OneInputStreamOperator<CommittableMessage<CommT>, Void>,
BoundedOneInput {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+
+ /** Record all the committables until commit. */
+ private final Deque<CommT> committables = new ArrayDeque<>();
+
+ /**
+ * Aggregate committables to global committables and commit the global
committables to the
+ * external system.
+ */
+ private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+ /** The operator's state descriptor. */
+ private static final ListStateDescriptor<byte[]>
STREAMING_COMMITTER_RAW_STATES_DESC =
+ new ListStateDescriptor<>(
+ "streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);
+
+ /** Group the committable by the checkpoint id. */
+ private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
+
+ /** The committable's serializer. */
+ private final SimpleVersionedSerializer<GlobalCommT> committableSerializer;
+
+ /** The operator's state. */
+ private ListState<GlobalCommT> streamingCommitterState;
+
+ public GlobalCommitterOperator(
+ GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+ SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+ this.globalCommitter = checkNotNull(globalCommitter);
+ this.committableSerializer = committableSerializer;
+ this.committablesPerCheckpoint = new TreeMap<>();
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ streamingCommitterState =
+ new SimpleVersionedListState<>(
+ context.getOperatorStateStore()
+
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+ committableSerializer);
+ List<GlobalCommT> restored = new ArrayList<>();
+ streamingCommitterState.get().forEach(restored::add);
+ streamingCommitterState.clear();
+
globalCommitter.commit(globalCommitter.filterRecoveredCommittables(restored));
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ List<CommT> committables = pollCommittables();
+ if (committables.size() > 0) {
+ committablesPerCheckpoint.put(
+ context.getCheckpointId(),
+ globalCommitter.combine(context.getCheckpointId(),
committables));
+ }
+ streamingCommitterState.update(new
ArrayList<>(committablesPerCheckpoint.values()));
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ List<CommT> allCommittables = pollCommittables();
+ if (!allCommittables.isEmpty()) {
+ globalCommitter.commit(
+ Collections.singletonList(
+ globalCommitter.combine(Long.MAX_VALUE,
allCommittables)));
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ LOG.info("Committing the state for checkpoint {}", checkpointId);
+ NavigableMap<Long, GlobalCommT> headMap =
+ committablesPerCheckpoint.headMap(checkpointId, true);
+ globalCommitter.commit(new ArrayList<>(headMap.values()));
+ headMap.clear();
+ }
+
+ @Override
+ public void processElement(StreamRecord<CommittableMessage<CommT>>
element) {
+ CommittableMessage<CommT> message = element.getValue();
+ if (message instanceof CommittableWithLineage) {
+ this.committables.add(((CommittableWithLineage<CommT>)
message).getCommittable());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ globalCommitter.close();
+ committablesPerCheckpoint.clear();
+ committables.clear();
+ super.close();
+ }
+
+ private List<CommT> pollCommittables() {
+ List<CommT> committables = new ArrayList<>(this.committables);
+ this.committables.clear();
+ return committables;
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
new file mode 100644
index 0000000..66b4cd6
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit
protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link
GlobalCommitter} that
+ * actually commits the data.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ * @param <GlobalCommT> The type of the aggregated committable.
+ */
+public interface GlobalCommittingSink<InputT, CommT, GlobalCommT> extends
Sink<InputT> {
+
+ /**
+ * Creates a {@link PrecommittingSinkWriter} that creates committables on
checkpoint or end of
+ * input.
+ */
+ PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context)
throws IOException;
+
+ /**
+ * Creates a {@link GlobalCommitter} that permanently makes the previously
written data visible
+ * through {@link GlobalCommitter#commit}.
+ */
+ GlobalCommitter<CommT, GlobalCommT> createCommitter() throws IOException;
+
+ /** Returns the serializer of the committable type. */
+ SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+ /** Returns the serializer of the global committable type. */
+ SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
new file mode 100644
index 0000000..4d83149
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+
+import java.io.IOException;
+
+/** A translator for the {@link GlobalCommittingSink}. */
+public class GlobalCommittingSinkTranslator {
+
+ private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
+ private static final String WRITER_NAME = "Writer";
+
+ public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
+ DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT>
sink)
+ throws IOException {
+ TypeInformation<CommittableMessage<CommT>> commitType =
+ CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
+ SingleOutputStreamOperator<CommittableMessage<CommT>> written =
+ input.transform(WRITER_NAME, commitType, new
SinkWriterOperatorFactory<>(sink));
+
+ SingleOutputStreamOperator<Void> committed =
+ written.global()
+ .transform(
+ GLOBAL_COMMITTER_NAME,
+ Types.VOID,
+ new GlobalCommitterOperator<>(
+ sink.createCommitter(),
+ sink.getGlobalCommittableSerializer()))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
new file mode 100644
index 0000000..2d59440
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CommittableSerializer}. */
+public class CommittableSerializerTest {
+
+ @Test
+ public void test() {
+ byte[] bytes = new byte[] {4, 5, 1};
+ Committable committable = new Committable(Committable.Kind.LOG, bytes,
9);
+ byte[] serialize =
CommittableSerializer.INSTANCE.serialize(committable);
+ Committable deser = CommittableSerializer.INSTANCE.deserialize(1,
serialize);
+ assertThat(deser.kind()).isEqualTo(Committable.Kind.LOG);
+ assertThat(deser.serializerVersion()).isEqualTo(9);
+ assertThat(deser.wrappedCommittable()).isEqualTo(bytes);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
similarity index 84%
rename from
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
rename to
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
index 6fd2ac3..5013529 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
@@ -30,19 +30,19 @@ import static
org.apache.flink.table.store.file.manifest.ManifestCommittableSeri
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link LocalCommittableSerializer}. */
-public class LocalCommittableSerializerTest {
+/** Test for {@link FileCommittableSerializer}. */
+public class FileCommittableSerializerTest {
@Test
public void test() throws IOException {
- LocalCommittableSerializer serializer =
- new LocalCommittableSerializer(
+ FileCommittableSerializer serializer =
+ new FileCommittableSerializer(
RowType.of(new IntType()),
RowType.of(new IntType()),
RowType.of(new IntType()));
Increment increment = randomIncrement();
- LocalCommittable committable = new LocalCommittable(row(0), 1,
increment);
- LocalCommittable newCommittable =
+ FileCommittable committable = new FileCommittable(row(0), 1,
increment);
+ FileCommittable newCommittable =
serializer.deserialize(1, serializer.serialize(committable));
assertThat(newCommittable).isEqualTo(committable);
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
new file mode 100644
index 0000000..31753c3
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalCommittableSerializer}. */
+public class GlobalCommittableSerializerTest {
+
+ @Test
+ public void test() throws IOException {
+ List<String> logs = Arrays.asList("1", "2");
+ GlobalCommittable<String> committable =
+ new GlobalCommittable<>(logs,
ManifestCommittableSerializerTest.create());
+ GlobalCommittableSerializer<String> serializer =
+ new GlobalCommittableSerializer<>(
+ new TestStringSerializer(),
ManifestCommittableSerializerTest.serializer());
+ byte[] serialized = serializer.serialize(committable);
+ GlobalCommittable<String> deser = serializer.deserialize(1,
serialized);
+
assertThat(deser.logCommittables()).isEqualTo(committable.logCommittables());
+
assertThat(deser.fileCommittable()).isEqualTo(committable.fileCommittable());
+ }
+
+ private static final class TestStringSerializer implements
SimpleVersionedSerializer<String> {
+
+ private static final int VERSION = 1073741823;
+
+ private TestStringSerializer() {}
+
+ public int getVersion() {
+ return VERSION;
+ }
+
+ public byte[] serialize(String str) {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public String deserialize(int version, byte[] serialized) {
+ assertThat(version).isEqualTo(VERSION);
+ return new String(serialized, StandardCharsets.UTF_8);
+ }
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
new file mode 100644
index 0000000..bf1330d
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LogOffsetCommittable}. */
+public class LogOffsetCommittableTest {
+
+ @Test
+ public void test() {
+ LogOffsetCommittable committable = new LogOffsetCommittable(5, 9);
+ LogOffsetCommittable deser =
LogOffsetCommittable.fromBytes(committable.toBytes());
+ assertThat(deser.bucket()).isEqualTo(committable.bucket());
+ assertThat(deser.offset()).isEqualTo(committable.offset());
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index e69ec17..f469ce6 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -56,7 +55,7 @@ public class StoreSinkTest {
@Test
public void testChangelogs() throws Exception {
- StoreSink sink = newSink(null);
+ StoreSink<?, ?> sink = newSink(null);
writeAndCommit(
sink,
GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
@@ -73,8 +72,8 @@ public class StoreSinkTest {
@Test
public void testNoKeyChangelogs() throws Exception {
- StoreSink sink =
- new StoreSink(
+ StoreSink<?, ?> sink =
+ new StoreSink<>(
identifier,
fileStore,
rowType,
@@ -99,7 +98,7 @@ public class StoreSinkTest {
@Test
public void testAppend() throws Exception {
- StoreSink sink = newSink(null);
+ StoreSink<?, ?> sink = newSink(null);
writeAndAssert(sink);
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1,
10, 11));
@@ -111,7 +110,7 @@ public class StoreSinkTest {
@Test
public void testOverwrite() throws Exception {
- StoreSink sink = newSink(new HashMap<>());
+ StoreSink<?, ?> sink = newSink(new HashMap<>());
writeAndAssert(sink);
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1,
10, 11));
@@ -126,7 +125,7 @@ public class StoreSinkTest {
public void testOverwritePartition() throws Exception {
HashMap<String, String> partition = new HashMap<>();
partition.put("part", "0");
- StoreSink sink = newSink(partition);
+ StoreSink<?, ?> sink = newSink(partition);
writeAndAssert(sink);
writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1,
10, 11));
@@ -138,7 +137,7 @@ public class StoreSinkTest {
.isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
}
- private void writeAndAssert(StoreSink sink) throws Exception {
+ private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
writeAndCommit(
sink,
GenericRowData.of(0, 0, 1),
@@ -153,17 +152,17 @@ public class StoreSinkTest {
.isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1",
"ADD-key-7-value-0/7/5"));
}
- private void writeAndCommit(StoreSink sink, RowData... rows) throws
Exception {
+ private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws
Exception {
commit(sink, write(sink, rows));
}
- private List<LocalCommittable> write(StoreSink sink, RowData... rows)
throws Exception {
- StoreSinkWriter writer = sink.createWriter(null, null);
+ private List<Committable> write(StoreSink<?, ?> sink, RowData... rows)
throws Exception {
+ StoreSinkWriter<?> writer = sink.createWriter(null);
for (RowData row : rows) {
writer.write(row, null);
}
- List<LocalCommittable> localCommittables = writer.prepareCommit(true);
+ List<Committable> committables = writer.prepareCommit();
Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new
HashMap<>(writer.writers());
assertThat(writers.size()).isGreaterThan(0);
@@ -176,12 +175,12 @@ public class StoreSinkTest {
assertThat(testWriter.synced).isTrue();
assertThat(testWriter.closed).isTrue();
}));
- return localCommittables;
+ return committables;
}
- private void commit(StoreSink sink, List<LocalCommittable>
localCommittables) throws Exception {
- StoreGlobalCommitter committer = (StoreGlobalCommitter)
sink.createGlobalCommitter().get();
- ManifestCommittable committable = committer.combine(localCommittables);
+ private void commit(StoreSink<?, ?> sink, List<Committable>
fileCommittables) throws Exception {
+ StoreGlobalCommitter committer = sink.createCommitter();
+ GlobalCommittable<?> committable = committer.combine(0,
fileCommittables);
fileStore.expired = false;
lock.locked = false;
@@ -200,8 +199,8 @@ public class StoreSinkTest {
assertThat(lock.closed).isTrue();
}
- private StoreSink newSink(Map<String, String> overwritePartition) {
- return new StoreSink(
+ private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
+ return new StoreSink<>(
identifier,
fileStore,
rowType,
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 87eee11..e31ffdc 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.RecordWriter;
import java.util.ArrayList;
@@ -130,7 +131,22 @@ public class TestFileStore implements FileStore {
public Increment prepareCommit() {
List<SstFileMeta> newFiles =
records.stream()
- .map(s -> new SstFileMeta(s, 0, 0, null, null,
null, 0, 0, 0))
+ .map(
+ s ->
+ new SstFileMeta(
+ s,
+ 0,
+ 0,
+ null,
+ null,
+ new FieldStats[] {
+ new FieldStats(null,
null, 0),
+ new FieldStats(null,
null, 0),
+ new FieldStats(null,
null, 0)
+ },
+ 0,
+ 0,
+ 0))
.collect(Collectors.toList());
return new Increment(newFiles, Collections.emptyList(),
Collections.emptyList());
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
new file mode 100644
index 0000000..77176ef
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import
org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalCommitterOperator}. */
+public class GlobalCommitterOperatorTest {
+
+ @Test
+ public void closeCommitter() throws Exception {
+ final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ createTestHarness(globalCommitter);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.close();
+ assertThat(globalCommitter.isClosed()).isTrue();
+ }
+
+ @Test
+ public void restoredFromMergedState() throws Exception {
+
+ final List<String> input1 = Arrays.asList("host", "drop");
+ final OperatorSubtaskState operatorSubtaskState1 =
+ buildSubtaskState(createTestHarness(), input1);
+
+ final List<String> input2 = Arrays.asList("future", "evil", "how");
+ final OperatorSubtaskState operatorSubtaskState2 =
+ buildSubtaskState(createTestHarness(), input2);
+
+ final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ createTestHarness(globalCommitter);
+
+ final OperatorSubtaskState mergedOperatorSubtaskState =
+ OneInputStreamOperatorTestHarness.repackageState(
+ operatorSubtaskState1, operatorSubtaskState2);
+
+ testHarness.initializeState(
+ OneInputStreamOperatorTestHarness.repartitionOperatorState(
+ mergedOperatorSubtaskState, 2, 2, 1, 0));
+ testHarness.open();
+
+ final List<String> expectedOutput = new ArrayList<>();
+ expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
+ expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
+
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ testHarness.close();
+
+ assertThat(globalCommitter.getCommittedData())
+ .containsExactlyInAnyOrder(expectedOutput.toArray(new
String[0]));
+ }
+
+ @Test
+ public void commitMultipleStagesTogether() throws Exception {
+
+ final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
+
+ final List<String> input1 = Arrays.asList("cautious", "nature");
+ final List<String> input2 = Arrays.asList("count", "over");
+ final List<String> input3 = Arrays.asList("lawyer", "grammar");
+
+ final List<String> expectedOutput = new ArrayList<>();
+
+ expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
+ expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
+ expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
+
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ createTestHarness(globalCommitter);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+
+ testHarness.processElements(committableRecords(input1));
+ testHarness.snapshot(1L, 1L);
+ testHarness.processElements(committableRecords(input2));
+ testHarness.snapshot(2L, 2L);
+ testHarness.processElements(committableRecords(input3));
+ testHarness.snapshot(3L, 3L);
+
+ testHarness.notifyOfCompletedCheckpoint(3L);
+
+ testHarness.close();
+
+ assertThat(globalCommitter.getCommittedData())
+ .containsExactlyInAnyOrder(expectedOutput.toArray(new
String[0]));
+ }
+
+ @Test
+ public void filterRecoveredCommittables() throws Exception {
+ final List<String> input = Arrays.asList("silent", "elder",
"patience");
+ final String successCommittedCommittable =
DefaultGlobalCommitter.COMBINER.apply(input);
+
+ final OperatorSubtaskState operatorSubtaskState =
+ buildSubtaskState(createTestHarness(), input);
+ final DefaultGlobalCommitter globalCommitter =
+ new DefaultGlobalCommitter(successCommittedCommittable);
+
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ createTestHarness(globalCommitter);
+
+ // all data from previous checkpoint are expected to be committed,
+ // so we expect no data to be re-committed.
+ testHarness.initializeState(operatorSubtaskState);
+ testHarness.open();
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ assertThat(globalCommitter.getCommittedData().isEmpty()).isTrue();
+ testHarness.close();
+ }
+
+ @Test
+ public void endOfInput() throws Exception {
+ final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
+
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ createTestHarness(globalCommitter);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ List<String> input = Arrays.asList("silent", "elder", "patience");
+ testHarness.processElements(committableRecords(input));
+ testHarness.endInput();
+ testHarness.close();
+
assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
+ }
+
+ private OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> createTestHarness()
+ throws Exception {
+ return createTestHarness(new DefaultGlobalCommitter());
+ }
+
+ private OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> createTestHarness(
+ GlobalCommitter<String, String> globalCommitter) throws Exception {
+ return new OneInputStreamOperatorTestHarness<>(
+ new GlobalCommitterOperator<>(
+ globalCommitter, StringCommittableSerializer.INSTANCE),
+ CommittableMessageTypeInfo.of(
+
(SerializableSupplier<SimpleVersionedSerializer<String>>)
+ () ->
StringCommittableSerializer.INSTANCE)
+ .createSerializer(new ExecutionConfig()));
+ }
+
+ public static OperatorSubtaskState buildSubtaskState(
+ OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness,
+ List<String> input)
+ throws Exception {
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.processElements(
+ input.stream()
+ .map(GlobalCommitterOperatorTest::toCommittableMessage)
+ .map(StreamRecord::new)
+ .collect(Collectors.toList()));
+ testHarness.prepareSnapshotPreBarrier(1L);
+ OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(1L,
1L);
+ testHarness.close();
+ return operatorSubtaskState;
+ }
+
+ private static List<StreamRecord<CommittableMessage<String>>>
committableRecords(
+ Collection<String> elements) {
+ return elements.stream()
+ .map(GlobalCommitterOperatorTest::toCommittableMessage)
+ .map(StreamRecord::new)
+ .collect(Collectors.toList());
+ }
+
+ private static CommittableMessage<String> toCommittableMessage(String
input) {
+ return new CommittableWithLineage<>(input, null, -1);
+ }
+
+ /** A {@link GlobalCommitter} that always commits global committables
successfully. */
+ private static class DefaultGlobalCommitter implements
GlobalCommitter<String, String> {
+
+ private static final Function<List<String>, String> COMBINER =
+ strings -> {
+ // we sort here because we want to have a deterministic
result during the unit
+ // test
+ Collections.sort(strings);
+ return String.join("+", strings);
+ };
+
+ private final Queue<String> committedData;
+
+ private boolean isClosed;
+
+ private final String committedSuccessData;
+
+ DefaultGlobalCommitter() {
+ this("");
+ }
+
+ DefaultGlobalCommitter(String committedSuccessData) {
+ this.committedData = new ConcurrentLinkedQueue<>();
+ this.isClosed = false;
+ this.committedSuccessData = committedSuccessData;
+ }
+
+ @Override
+ public List<String> filterRecoveredCommittables(List<String>
globalCommittables) {
+ if (committedSuccessData == null) {
+ return globalCommittables;
+ }
+ return globalCommittables.stream()
+ .filter(s -> !s.equals(committedSuccessData))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String combine(long checkpointId, List<String> committables) {
+ return COMBINER.apply(committables);
+ }
+
+ @Override
+ public void commit(List<String> committables) {
+ committedData.addAll(committables);
+ }
+
+ public List<String> getCommittedData() {
+ if (committedData != null) {
+ return new ArrayList<>(committedData);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 9b9085b..e30286d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -37,7 +37,7 @@ public class Snapshot {
private static final String FIELD_ID = "id";
private static final String FIELD_MANIFEST_LIST = "manifestList";
private static final String FIELD_COMMIT_USER = "commitUser";
- private static final String FIELD_COMMIT_UUID = "commitUuid";
+ private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
@@ -51,8 +51,8 @@ public class Snapshot {
private final String commitUser;
// for deduplication
- @JsonProperty(FIELD_COMMIT_UUID)
- private final String commitUuid;
+ @JsonProperty(FIELD_COMMIT_IDENTIFIER)
+ private final String commitIdentifier;
@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
@@ -65,13 +65,13 @@ public class Snapshot {
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
- @JsonProperty(FIELD_COMMIT_UUID) String commitUuid,
+ @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
this.id = id;
this.manifestList = manifestList;
this.commitUser = commitUser;
- this.commitUuid = commitUuid;
+ this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
}
@@ -91,9 +91,9 @@ public class Snapshot {
return commitUser;
}
- @JsonGetter(FIELD_COMMIT_UUID)
- public String commitUuid() {
- return commitUuid;
+ @JsonGetter(FIELD_COMMIT_IDENTIFIER)
+ public String commitIdentifier() {
+ return commitIdentifier;
}
@JsonGetter(FIELD_COMMIT_KIND)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index 765fe1a..a461c2d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -27,37 +27,52 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.UUID;
/** Manifest commit message. */
public class ManifestCommittable {
- private final String uuid;
+ private final String identifier;
+ private final Map<Integer, Long> logOffsets;
private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles;
private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>>
compactBefore;
private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>>
compactAfter;
- public ManifestCommittable() {
- this(UUID.randomUUID().toString(), new HashMap<>(), new HashMap<>(),
new HashMap<>());
+ public ManifestCommittable(String identifier) {
+ this.identifier = identifier;
+ this.logOffsets = new HashMap<>();
+ this.newFiles = new HashMap<>();
+ this.compactBefore = new HashMap<>();
+ this.compactAfter = new HashMap<>();
}
public ManifestCommittable(
- String uuid,
+ String identifier,
+ Map<Integer, Long> logOffsets,
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles,
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore,
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) {
- this.uuid = uuid;
+ this.identifier = identifier;
+ this.logOffsets = logOffsets;
this.newFiles = newFiles;
this.compactBefore = compactBefore;
this.compactAfter = compactAfter;
}
- public void add(BinaryRowData partition, int bucket, Increment increment) {
+ public void addFileCommittable(BinaryRowData partition, int bucket,
Increment increment) {
addFiles(newFiles, partition, bucket, increment.newFiles());
addFiles(compactBefore, partition, bucket, increment.compactBefore());
addFiles(compactAfter, partition, bucket, increment.compactAfter());
}
+ public void addLogOffset(int bucket, long offset) {
+ if (logOffsets.containsKey(bucket)) {
+ throw new RuntimeException(
+ String.format(
+ "bucket-%d appears multiple times, which is not
possible.", bucket));
+ }
+ logOffsets.put(bucket, offset);
+ }
+
private static void addFiles(
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map,
BinaryRowData partition,
@@ -68,8 +83,12 @@ public class ManifestCommittable {
.addAll(files);
}
- public String uuid() {
- return uuid;
+ public String identifier() {
+ return identifier;
+ }
+
+ public Map<Integer, Long> logOffsets() {
+ return logOffsets;
}
public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() {
@@ -93,7 +112,8 @@ public class ManifestCommittable {
return false;
}
ManifestCommittable that = (ManifestCommittable) o;
- return Objects.equals(uuid, that.uuid)
+ return Objects.equals(identifier, that.identifier)
+ && Objects.equals(logOffsets, that.logOffsets)
&& Objects.equals(newFiles, that.newFiles)
&& Objects.equals(compactBefore, that.compactBefore)
&& Objects.equals(compactAfter, that.compactAfter);
@@ -101,19 +121,23 @@ public class ManifestCommittable {
@Override
public int hashCode() {
- return Objects.hash(uuid, newFiles, compactBefore, compactAfter);
+ return Objects.hash(identifier, logOffsets, newFiles, compactBefore,
compactAfter);
}
@Override
public String toString() {
- return "uuid: "
- + uuid
- + "\nnew files:\n"
+ return "ManifestCommittable{"
+ + "identifier="
+ + identifier
+ + ", logOffsets="
+ + logOffsets
+ + ", newFiles="
+ filesToString(newFiles)
- + "compact before:\n"
+ + ", compactBefore="
+ filesToString(compactBefore)
- + "compact after:\n"
- + filesToString(compactAfter);
+ + ", compactAfter="
+ + filesToString(compactAfter)
+ + '}';
}
private static String filesToString(Map<BinaryRowData, Map<Integer,
List<SstFileMeta>>> files) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index 81e4e1f..7ceb607 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -55,13 +55,32 @@ public class ManifestCommittableSerializer
public byte[] serialize(ManifestCommittable obj) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
- view.writeUTF(obj.uuid());
+ view.writeUTF(obj.identifier());
+ serializeOffsets(view, obj.logOffsets());
serializeFiles(view, obj.newFiles());
serializeFiles(view, obj.compactBefore());
serializeFiles(view, obj.compactAfter());
return out.toByteArray();
}
+ private void serializeOffsets(DataOutputViewStreamWrapper view,
Map<Integer, Long> offsets)
+ throws IOException {
+ view.writeInt(offsets.size());
+ for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
+ view.writeInt(entry.getKey());
+ view.writeLong(entry.getValue());
+ }
+ }
+
+ private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view)
throws IOException {
+ int size = view.readInt();
+ Map<Integer, Long> offsets = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ offsets.put(view.readInt(), view.readLong());
+ }
+ return offsets;
+ }
+
private void serializeFiles(
DataOutputViewStreamWrapper view,
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files)
@@ -107,6 +126,7 @@ public class ManifestCommittableSerializer
DataInputDeserializer view = new DataInputDeserializer(serialized);
return new ManifestCommittable(
view.readUTF(),
+ deserializeOffsets(view),
deserializeFiles(view),
deserializeFiles(view),
deserializeFiles(view));
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index cb854ef..15129b3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -115,18 +115,18 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return committableList;
}
- // check if a committable is already committed by its uuid
- Map<String, ManifestCommittable> uuids = new LinkedHashMap<>();
+ // check if a committable is already committed by its identifier
+ Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>();
for (ManifestCommittable committable : committableList) {
- uuids.put(committable.uuid(), committable);
+ identifiers.put(committable.identifier(), committable);
}
for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID;
id--) {
Path snapshotPath = pathFactory.toSnapshotPath(id);
Snapshot snapshot = Snapshot.fromPath(snapshotPath);
if (commitUser.equals(snapshot.commitUser())) {
- if (uuids.containsKey(snapshot.commitUuid())) {
- uuids.remove(snapshot.commitUuid());
+ if (identifiers.containsKey(snapshot.commitIdentifier())) {
+ identifiers.remove(snapshot.commitIdentifier());
} else {
// early exit, because committableList must be the latest
commits by this
// commit user
@@ -135,7 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
}
- return new ArrayList<>(uuids.values());
+ return new ArrayList<>(identifiers.values());
}
@Override
@@ -145,13 +145,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
List<ManifestEntry> appendChanges =
collectChanges(committable.newFiles(), ValueKind.ADD);
- tryCommit(appendChanges, committable.uuid(),
Snapshot.CommitKind.APPEND, false);
+ tryCommit(appendChanges, committable.identifier(),
Snapshot.CommitKind.APPEND, false);
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(),
ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(),
ValueKind.ADD));
if (!compactChanges.isEmpty()) {
- tryCommit(compactChanges, committable.uuid(),
Snapshot.CommitKind.COMPACT, true);
+ tryCommit(compactChanges, committable.identifier(),
Snapshot.CommitKind.COMPACT, true);
}
}
@@ -183,13 +183,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
// overwrite new files
tryOverwrite(
- partitionFilter, appendChanges, committable.uuid(),
Snapshot.CommitKind.APPEND);
+ partitionFilter,
+ appendChanges,
+ committable.identifier(),
+ Snapshot.CommitKind.APPEND);
List<ManifestEntry> compactChanges = new ArrayList<>();
compactChanges.addAll(collectChanges(committable.compactBefore(),
ValueKind.DELETE));
compactChanges.addAll(collectChanges(committable.compactAfter(),
ValueKind.ADD));
if (!compactChanges.isEmpty()) {
- tryCommit(compactChanges, committable.uuid(),
Snapshot.CommitKind.COMPACT, true);
+ tryCommit(compactChanges, committable.identifier(),
Snapshot.CommitKind.COMPACT, true);
}
}
@@ -209,7 +212,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private void tryOverwrite(
Predicate partitionFilter,
List<ManifestEntry> changes,
- String hash,
+ String identifier,
Snapshot.CommitKind commitKind) {
while (true) {
Long latestSnapshotId = pathFactory.latestSnapshotId();
@@ -233,7 +236,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
changesWithOverwrite.addAll(changes);
- if (tryCommitOnce(changesWithOverwrite, hash, commitKind,
latestSnapshotId, false)) {
+ if (tryCommitOnce(
+ changesWithOverwrite, identifier, commitKind,
latestSnapshotId, false)) {
break;
}
}
@@ -264,7 +268,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private boolean tryCommitOnce(
List<ManifestEntry> changes,
- String hash,
+ String identifier,
Snapshot.CommitKind commitKind,
Long latestSnapshotId,
boolean checkDeletedFiles) {
@@ -311,7 +315,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
newSnapshotId,
manifestListName,
commitUser,
- hash,
+ identifier,
commitKind,
System.currentTimeMillis());
FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
@@ -325,7 +329,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
newSnapshotId,
newSnapshotPath.toString(),
commitUser,
- hash,
+ identifier,
commitKind.name()),
e);
}
@@ -354,12 +358,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d
(path %s) by user %s "
- + "with hash %s and kind %s. "
+ + "with identifier %s and kind %s. "
+ "Cannot clean up because we can't
determine the success.",
newSnapshotId,
newSnapshotPath.toString(),
commitUser,
- hash,
+ identifier,
commitKind.name()),
e);
}
@@ -372,12 +376,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
LOG.warn(
String.format(
"Atomic rename failed for snapshot #%d (path %s) by
user %s "
- + "with hash %s and kind %s. "
+ + "with identifier %s and kind %s. "
+ "Clean up and try again.",
newSnapshotId,
newSnapshotPath.toString(),
commitUser,
- hash,
+ identifier,
commitKind.name()));
cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas,
newMetas);
return false;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 4ebbf85..10b06c3 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
@@ -41,30 +42,43 @@ public class ManifestCommittableSerializerTest {
@Test
public void testCommittableSerDe() throws IOException {
- ManifestCommittableSerializer serializer =
- new ManifestCommittableSerializer(
- RowType.of(new IntType()),
- RowType.of(new IntType()),
- RowType.of(new IntType()));
- ManifestCommittable committable = new ManifestCommittable();
+ ManifestCommittableSerializer serializer = serializer();
+ ManifestCommittable committable = create();
+ byte[] serialized = serializer.serialize(committable);
+ assertThat(serializer.deserialize(1,
serialized)).isEqualTo(committable);
+ }
+
+ public static ManifestCommittableSerializer serializer() {
+ return new ManifestCommittableSerializer(
+ RowType.of(new IntType()), RowType.of(new IntType()),
RowType.of(new IntType()));
+ }
+
+ public static ManifestCommittable create() {
+ ManifestCommittable committable =
+ new ManifestCommittable(String.valueOf(new
Random().nextLong()));
addAndAssert(committable, row(0), 0);
addAndAssert(committable, row(0), 1);
addAndAssert(committable, row(1), 0);
addAndAssert(committable, row(1), 1);
- byte[] serialized = serializer.serialize(committable);
- assertThat(serializer.deserialize(1,
serialized)).isEqualTo(committable);
+ return committable;
}
- private void addAndAssert(
+ private static void addAndAssert(
ManifestCommittable committable, BinaryRowData partition, int
bucket) {
Increment increment = randomIncrement();
- committable.add(partition, bucket, increment);
+ committable.addFileCommittable(partition, bucket, increment);
assertThat(committable.newFiles().get(partition).get(bucket))
.isEqualTo(increment.newFiles());
assertThat(committable.compactBefore().get(partition).get(bucket))
.isEqualTo(increment.compactBefore());
assertThat(committable.compactAfter().get(partition).get(bucket))
.isEqualTo(increment.compactAfter());
+
+ if (!committable.logOffsets().containsKey(bucket)) {
+ int offset = ID.incrementAndGet();
+ committable.addLogOffset(bucket, offset);
+ assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
+ }
}
public static Increment randomIncrement() {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index 8b1941e..07a27d3 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -224,13 +225,15 @@ public class OperationTestUtils {
}
FileStoreCommit commit = createCommit(fileFormat, pathFactory);
- ManifestCommittable committable = new ManifestCommittable();
+ ManifestCommittable committable =
+ new ManifestCommittable(String.valueOf(new
Random().nextLong()));
for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>
entryWithPartition :
writers.entrySet()) {
for (Map.Entry<Integer, RecordWriter> entryWithBucket :
entryWithPartition.getValue().entrySet()) {
Increment increment =
entryWithBucket.getValue().prepareCommit();
- committable.add(entryWithPartition.getKey(),
entryWithBucket.getKey(), increment);
+ committable.addFileCommittable(
+ entryWithPartition.getKey(), entryWithBucket.getKey(),
increment);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 8033b40..43fb2e9 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
@@ -99,9 +100,10 @@ public class TestCommitThread extends Thread {
for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
writeData();
}
- ManifestCommittable committable = new ManifestCommittable();
+ ManifestCommittable committable =
+ new ManifestCommittable(String.valueOf(new
Random().nextLong()));
for (Map.Entry<BinaryRowData, MergeTreeWriter> entry :
writers.entrySet()) {
- committable.add(entry.getKey(), 0,
entry.getValue().prepareCommit());
+ committable.addFileCommittable(entry.getKey(), 0,
entry.getValue().prepareCommit());
}
runWithRetry(committable, () -> commit.commit(committable,
Collections.emptyMap()));
@@ -109,8 +111,9 @@ public class TestCommitThread extends Thread {
private void doOverwrite() throws Exception {
BinaryRowData partition = overwriteData();
- ManifestCommittable committable = new ManifestCommittable();
- committable.add(partition, 0, writers.get(partition).prepareCommit());
+ ManifestCommittable committable =
+ new ManifestCommittable(String.valueOf(new
Random().nextLong()));
+ committable.addFileCommittable(partition, 0,
writers.get(partition).prepareCommit());
runWithRetry(
committable,