This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7f91a85 [FLINK-26423] Integrate log store to StoreSink
7f91a85 is described below
commit 7f91a853e70bb558fd2e2aa619c83558b9bccaa7
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 8 14:47:41 2022 +0800
[FLINK-26423] Integrate log store to StoreSink
This closes #28
---
flink-table-store-connector/pom.xml | 14 ++
.../table/store/connector/sink/Committable.java | 13 +-
.../connector/sink/CommittableSerializer.java | 57 ++++-
.../store/connector/sink/GlobalCommittable.java | 44 ----
.../sink/GlobalCommittableSerializer.java | 94 --------
.../store/connector/sink/LogOffsetCommittable.java | 18 ++
.../store/connector/sink/StoreGlobalCommitter.java | 49 ++--
.../store/connector/sink/StoreLocalCommitter.java | 69 ++++++
.../table/store/connector/sink/StoreSink.java | 93 ++++++--
.../store/connector/sink/StoreSinkWriter.java | 64 +++++-
...perator.java => AbstractCommitterOperator.java} | 89 ++++---
.../sink/global/GlobalCommitterOperator.java | 112 ++-------
.../sink/global/GlobalCommittingSink.java | 6 -
.../global/GlobalCommittingSinkTranslator.java | 46 ++--
.../sink/global/LocalCommitterOperator.java | 205 +++++++++++++++++
.../table/store/connector/FileStoreITCase.java | 31 ++-
.../table/store/connector/FiniteTestSource.java | 156 +++++++++++++
.../connector/sink/CommittableSerializerTest.java | 71 +++++-
.../sink/GlobalCommittableSerializerTest.java | 69 ------
.../store/connector/sink/LogStoreSinkITCase.java | 168 ++++++++++++++
.../table/store/connector/sink/StoreSinkTest.java | 15 +-
.../sink/global/GlobalCommitterOperatorTest.java | 18 +-
.../sink/global/LocalCommitterOperatorTest.java | 255 +++++++++++++++++++++
.../flink/table/store/log/LogWriteCallback.java | 48 ++++
.../flink/table/store/kafka/KafkaLogTestUtils.java | 28 ++-
25 files changed, 1346 insertions(+), 486 deletions(-)
diff --git a/flink-table-store-connector/pom.xml
b/flink-table-store-connector/pom.xml
index 0b3e178..e4e8f38 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -211,6 +211,20 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index b4ba992..cea1b35 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -23,28 +23,21 @@ public class Committable {
private final Kind kind;
- private final byte[] wrappedCommittable;
+ private final Object wrappedCommittable;
- private final int serializerVersion;
-
- public Committable(Kind kind, byte[] wrappedCommittable, int
serializerVersion) {
+ public Committable(Kind kind, Object wrappedCommittable) {
this.kind = kind;
this.wrappedCommittable = wrappedCommittable;
- this.serializerVersion = serializerVersion;
}
public Kind kind() {
return kind;
}
- public byte[] wrappedCommittable() {
+ public Object wrappedCommittable() {
return wrappedCommittable;
}
- public int serializerVersion() {
- return serializerVersion;
- }
-
enum Kind {
FILE((byte) 0),
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 82627ea..9593862 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -20,12 +20,22 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.io.IOException;
import java.nio.ByteBuffer;
/** {@link SimpleVersionedSerializer} for {@link Committable}. */
public class CommittableSerializer implements
SimpleVersionedSerializer<Committable> {
- public static final CommittableSerializer INSTANCE = new
CommittableSerializer();
+ private final FileCommittableSerializer fileCommittableSerializer;
+
+ private final SimpleVersionedSerializer<Object> logCommittableSerializer;
+
+ public CommittableSerializer(
+ FileCommittableSerializer fileCommittableSerializer,
+ SimpleVersionedSerializer<Object> logCommittableSerializer) {
+ this.fileCommittableSerializer = fileCommittableSerializer;
+ this.logCommittableSerializer = logCommittableSerializer;
+ }
@Override
public int getVersion() {
@@ -33,22 +43,57 @@ public class CommittableSerializer implements
SimpleVersionedSerializer<Committa
}
@Override
- public byte[] serialize(Committable committable) {
- byte[] wrapped = committable.wrappedCommittable();
+ public byte[] serialize(Committable committable) throws IOException {
+ byte[] wrapped;
+ int version;
+ switch (committable.kind()) {
+ case FILE:
+ version = fileCommittableSerializer.getVersion();
+ wrapped =
+ fileCommittableSerializer.serialize(
+ (FileCommittable)
committable.wrappedCommittable());
+ break;
+ case LOG:
+ version = logCommittableSerializer.getVersion();
+ wrapped =
logCommittableSerializer.serialize(committable.wrappedCommittable());
+ break;
+ case LOG_OFFSET:
+ version = 1;
+ wrapped = ((LogOffsetCommittable)
committable.wrappedCommittable()).toBytes();
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported kind: " +
committable.kind());
+ }
+
return ByteBuffer.allocate(1 + wrapped.length + 4)
.put(committable.kind().toByteValue())
.put(wrapped)
- .putInt(committable.serializerVersion())
+ .putInt(version)
.array();
}
@Override
- public Committable deserialize(int i, byte[] bytes) {
+ public Committable deserialize(int i, byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
byte[] wrapped = new byte[bytes.length - 5];
buffer.get(wrapped);
int version = buffer.getInt();
- return new Committable(kind, wrapped, version);
+
+ Object wrappedCommittable;
+ switch (kind) {
+ case FILE:
+ wrappedCommittable =
fileCommittableSerializer.deserialize(version, wrapped);
+ break;
+ case LOG:
+ wrappedCommittable =
logCommittableSerializer.deserialize(version, wrapped);
+ break;
+ case LOG_OFFSET:
+ wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported kind: " +
kind);
+ }
+ return new Committable(kind, wrappedCommittable);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
deleted file mode 100644
index ac49d15..0000000
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-
-import java.util.List;
-
-/** Global aggregated committable for {@link StoreGlobalCommitter}. */
-public class GlobalCommittable<LogCommT> {
-
- private final List<LogCommT> logCommittables;
-
- private final ManifestCommittable fileCommittable;
-
- public GlobalCommittable(List<LogCommT> logCommittables,
ManifestCommittable fileCommittable) {
- this.logCommittables = logCommittables;
- this.fileCommittable = fileCommittable;
- }
-
- public List<LogCommT> logCommittables() {
- return logCommittables;
- }
-
- public ManifestCommittable fileCommittable() {
- return fileCommittable;
- }
-}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
deleted file mode 100644
index ea180d2..0000000
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/** {@link SimpleVersionedSerializer} for {@link GlobalCommittable}. */
-public class GlobalCommittableSerializer<LogCommT>
- implements SimpleVersionedSerializer<GlobalCommittable<LogCommT>> {
-
- private final SimpleVersionedSerializer<LogCommT> logSerializer;
-
- private final ManifestCommittableSerializer fileSerializer;
-
- public GlobalCommittableSerializer(
- SimpleVersionedSerializer<LogCommT> logSerializer,
- ManifestCommittableSerializer fileSerializer) {
- this.logSerializer = logSerializer;
- this.fileSerializer = fileSerializer;
- }
-
- @Override
- public int getVersion() {
- return 1;
- }
-
- @Override
- public byte[] serialize(GlobalCommittable<LogCommT> committable) throws
IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
-
- view.writeInt(logSerializer.getVersion());
- view.writeInt(committable.logCommittables().size());
- for (LogCommT commT : committable.logCommittables()) {
- byte[] bytes = logSerializer.serialize(commT);
- view.writeInt(bytes.length);
- view.write(bytes);
- }
-
- view.writeInt(fileSerializer.getVersion());
- byte[] bytes = fileSerializer.serialize(committable.fileCommittable());
- view.writeInt(bytes.length);
- view.write(bytes);
-
- return out.toByteArray();
- }
-
- @Override
- public GlobalCommittable<LogCommT> deserialize(int version, byte[]
serialized)
- throws IOException {
- DataInputDeserializer view = new DataInputDeserializer(serialized);
-
- int logVersion = view.readInt();
- int logSize = view.readInt();
- List<LogCommT> logCommTList = new ArrayList<>();
- for (int i = 0; i < logSize; i++) {
- byte[] bytes = new byte[view.readInt()];
- view.read(bytes);
- logCommTList.add(logSerializer.deserialize(logVersion, bytes));
- }
-
- int fileVersion = view.readInt();
- byte[] bytes = new byte[view.readInt()];
- view.read(bytes);
- ManifestCommittable file = fileSerializer.deserialize(fileVersion,
bytes);
-
- return new GlobalCommittable<>(logCommTList, file);
- }
-}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
index 68b62d8..8e47480 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector.sink;
import java.nio.ByteBuffer;
+import java.util.Objects;
/** Log offset committable for a bucket. */
public class LogOffsetCommittable {
@@ -48,4 +49,21 @@ public class LogOffsetCommittable {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return new LogOffsetCommittable(buffer.getInt(), buffer.getLong());
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LogOffsetCommittable that = (LogOffsetCommittable) o;
+ return bucket == that.bucket && offset == that.offset;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucket, offset);
+ }
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index a70b3b1..bf9fec2 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -27,22 +27,17 @@ import
org.apache.flink.table.store.file.operation.FileStoreExpire;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter<LogCommT>
- implements GlobalCommitter<Committable, GlobalCommittable<LogCommT>> {
+public class StoreGlobalCommitter implements GlobalCommitter<Committable,
ManifestCommittable> {
private final FileStoreCommit fileStoreCommit;
private final FileStoreExpire fileStoreExpire;
- private final FileCommittableSerializer fileCommitSerializer;
-
@Nullable private final CatalogLock lock;
@Nullable private final Map<String, String> overwritePartition;
@@ -50,12 +45,10 @@ public class StoreGlobalCommitter<LogCommT>
public StoreGlobalCommitter(
FileStoreCommit fileStoreCommit,
FileStoreExpire fileStoreExpire,
- FileCommittableSerializer fileCommitSerializer,
@Nullable CatalogLock lock,
@Nullable Map<String, String> overwritePartition) {
this.fileStoreCommit = fileStoreCommit;
this.fileStoreExpire = fileStoreExpire;
- this.fileCommitSerializer = fileCommitSerializer;
this.lock = lock;
this.overwritePartition = overwritePartition;
}
@@ -68,55 +61,45 @@ public class StoreGlobalCommitter<LogCommT>
}
@Override
- public List<GlobalCommittable<LogCommT>> filterRecoveredCommittables(
- List<GlobalCommittable<LogCommT>> globalCommittables) {
- List<ManifestCommittable> filtered =
- fileStoreCommit.filterCommitted(
- globalCommittables.stream()
- .map(GlobalCommittable::fileCommittable)
- .collect(Collectors.toList()));
- return globalCommittables.stream()
- .filter(c -> filtered.contains(c.fileCommittable()))
- .collect(Collectors.toList());
+ public List<ManifestCommittable> filterRecoveredCommittables(
+ List<ManifestCommittable> globalCommittables) {
+ return fileStoreCommit.filterCommitted(globalCommittables);
}
@Override
- public GlobalCommittable<LogCommT> combine(long checkpointId,
List<Committable> committables)
+ public ManifestCommittable combine(long checkpointId, List<Committable>
committables)
throws IOException {
- List<LogCommT> logCommittables = new ArrayList<>();
ManifestCommittable fileCommittable = new
ManifestCommittable(String.valueOf(checkpointId));
for (Committable committable : committables) {
switch (committable.kind()) {
case FILE:
- FileCommittable file =
- fileCommitSerializer.deserialize(
- committable.serializerVersion(),
- committable.wrappedCommittable());
+ FileCommittable file = (FileCommittable)
committable.wrappedCommittable();
fileCommittable.addFileCommittable(
file.partition(), file.bucket(), file.increment());
break;
case LOG_OFFSET:
LogOffsetCommittable offset =
-
LogOffsetCommittable.fromBytes(committable.wrappedCommittable());
+ (LogOffsetCommittable)
committable.wrappedCommittable();
fileCommittable.addLogOffset(offset.bucket(),
offset.offset());
break;
case LOG:
- throw new UnsupportedOperationException();
+ // log should be committed in local committer
+ break;
}
}
- return new GlobalCommittable<>(logCommittables, fileCommittable);
+ return fileCommittable;
}
@Override
- public void commit(List<GlobalCommittable<LogCommT>> committables) {
+ public void commit(List<ManifestCommittable> committables)
+ throws IOException, InterruptedException {
if (overwritePartition == null) {
- for (GlobalCommittable<LogCommT> committable : committables) {
- fileStoreCommit.commit(committable.fileCommittable(), new
HashMap<>());
+ for (ManifestCommittable committable : committables) {
+ fileStoreCommit.commit(committable, new HashMap<>());
}
} else {
- for (GlobalCommittable<LogCommT> committable : committables) {
- fileStoreCommit.overwrite(
- overwritePartition, committable.fileCommittable(), new
HashMap<>());
+ for (ManifestCommittable committable : committables) {
+ fileStoreCommit.overwrite(overwritePartition, committable, new
HashMap<>());
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
new file mode 100644
index 0000000..d3bda6a
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.flink.table.store.connector.sink.global.LocalCommitterOperator.convertCommitRequest;
+
+/** Store local {@link Committer} to commit log sink. */
+public class StoreLocalCommitter<LogCommT> implements Committer<Committable> {
+
+ @Nullable private final Committer<LogCommT> logCommitter;
+
+ public StoreLocalCommitter(@Nullable Committer<LogCommT> logCommitter) {
+ this.logCommitter = logCommitter;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<Committable>> requests)
+ throws IOException, InterruptedException {
+ List<CommitRequest<LogCommT>> logRequests = new ArrayList<>();
+ for (CommitRequest<Committable> request : requests) {
+ if (request.getCommittable().kind() == Committable.Kind.LOG) {
+ //noinspection unchecked
+ logRequests.add(
+ convertCommitRequest(
+ request,
+ committable -> (LogCommT)
committable.wrappedCommittable(),
+ committable -> new
Committable(Committable.Kind.LOG, committable)));
+ }
+ }
+
+ if (logRequests.size() > 0) {
+ Objects.requireNonNull(logCommitter, "logCommitter should not be
null.");
+ logCommitter.commit(logRequests);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (logCommitter != null) {
+ logCommitter.close();
+ }
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 9ae59fd..a848ca6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,30 +18,40 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.log.LogInitContext;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.flink.table.store.sink.SinkRecordConverter;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.function.Consumer;
/** {@link Sink} of dynamic store. */
public class StoreSink<WriterStateT, LogCommT>
implements StatefulSink<RowData, WriterStateT>,
- GlobalCommittingSink<RowData, Committable,
GlobalCommittable<LogCommT>> {
+ GlobalCommittingSink<RowData, Committable,
ManifestCommittable> {
private static final long serialVersionUID = 1L;
@@ -59,6 +69,8 @@ public class StoreSink<WriterStateT, LogCommT>
@Nullable private final Map<String, String> overwritePartition;
+ @Nullable private final LogSinkProvider logSinkProvider;
+
public StoreSink(
ObjectIdentifier tableIdentifier,
FileStore fileStore,
@@ -66,7 +78,8 @@ public class StoreSink<WriterStateT, LogCommT>
int[] primaryKeys,
int numBucket,
@Nullable CatalogLock.Factory lockFactory,
- @Nullable Map<String, String> overwritePartition) {
+ @Nullable Map<String, String> overwritePartition,
+ @Nullable LogSinkProvider logSinkProvider) {
this.tableIdentifier = tableIdentifier;
this.fileStore = fileStore;
this.partitions = partitions;
@@ -74,6 +87,7 @@ public class StoreSink<WriterStateT, LogCommT>
this.numBucket = numBucket;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
+ this.logSinkProvider = logSinkProvider;
}
@Override
@@ -84,6 +98,19 @@ public class StoreSink<WriterStateT, LogCommT>
@Override
public StoreSinkWriter<WriterStateT> restoreWriter(
InitContext initContext, Collection<WriterStateT> states) throws
IOException {
+ SinkWriter<SinkRecord> logWriter = null;
+ LogWriteCallback logCallback = null;
+ if (logSinkProvider != null) {
+ logCallback = new LogWriteCallback();
+ Consumer<?> metadataConsumer =
logSinkProvider.createMetadataConsumer(logCallback);
+ LogInitContext logInitContext = new LogInitContext(initContext,
metadataConsumer);
+ Sink<SinkRecord> logSink = logSinkProvider.createSink();
+ logWriter =
+ states == null
+ ? logSink.createWriter(logInitContext)
+ : ((StatefulSink<SinkRecord, WriterStateT>)
logSink)
+ .restoreWriter(logInitContext, states);
+ }
return new StoreSinkWriter<>(
fileStore.newWrite(),
new SinkRecordConverter(
@@ -91,17 +118,55 @@ public class StoreSink<WriterStateT, LogCommT>
primaryKeys.length > 0 ? fileStore.valueType() :
fileStore.keyType(),
partitions,
primaryKeys),
- fileCommitSerializer(),
- overwritePartition != null);
+ overwritePartition != null,
+ logWriter,
+ logCallback);
}
@Override
public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
- return new NoOutputSerializer<>();
+ return logSinkProvider == null
+ ? new NoOutputSerializer<>()
+ : ((StatefulSink<SinkRecord, WriterStateT>)
logSinkProvider.createSink())
+ .getWriterStateSerializer();
+ }
+
+ @Nullable
+ private Committer<LogCommT> logCommitter() {
+ if (logSinkProvider != null) {
+ Sink<SinkRecord> sink = logSinkProvider.createSink();
+ if (sink instanceof TwoPhaseCommittingSink) {
+ try {
+ return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>)
sink).createCommitter();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Nullable
+ private SimpleVersionedSerializer<LogCommT> logCommitSerializer() {
+ if (logSinkProvider != null) {
+ Sink<SinkRecord> sink = logSinkProvider.createSink();
+ if (sink instanceof TwoPhaseCommittingSink) {
+ return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink)
+ .getCommittableSerializer();
+ }
+ }
+
+ return null;
}
@Override
- public StoreGlobalCommitter<LogCommT> createGlobalCommitter() {
+ public Committer<Committable> createCommitter() {
+ return new StoreLocalCommitter<>(logCommitter());
+ }
+
+ @Override
+ public StoreGlobalCommitter createGlobalCommitter() {
FileStoreCommit commit = fileStore.newCommit();
CatalogLock lock;
if (lockFactory == null) {
@@ -120,22 +185,20 @@ public class StoreSink<WriterStateT, LogCommT>
});
}
- return new StoreGlobalCommitter<>(
- commit, fileStore.newExpire(), fileCommitSerializer(), lock,
overwritePartition);
+ return new StoreGlobalCommitter(commit, fileStore.newExpire(), lock,
overwritePartition);
}
+ @SuppressWarnings("unchecked")
@Override
public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
- return CommittableSerializer.INSTANCE;
+ return new CommittableSerializer(
+ fileCommitSerializer(), (SimpleVersionedSerializer<Object>)
logCommitSerializer());
}
@Override
- public GlobalCommittableSerializer<LogCommT>
getGlobalCommittableSerializer() {
- ManifestCommittableSerializer fileCommSerializer =
- new ManifestCommittableSerializer(
- fileStore.partitionType(), fileStore.keyType(),
fileStore.valueType());
- SimpleVersionedSerializer<LogCommT> logCommitSerializer = new
NoOutputSerializer<>();
- return new GlobalCommittableSerializer<>(logCommitSerializer,
fileCommSerializer);
+ public ManifestCommittableSerializer getGlobalCommittableSerializer() {
+ return new ManifestCommittableSerializer(
+ fileStore.partitionType(), fileStore.keyType(),
fileStore.valueType());
}
private FileCommittableSerializer fileCommitSerializer() {
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index ccc2482..6fea71d 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -28,16 +28,21 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.flink.table.store.sink.SinkRecordConverter;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -50,10 +55,12 @@ public class StoreSinkWriter<WriterStateT>
private final SinkRecordConverter recordConverter;
- private final FileCommittableSerializer fileCommitSerializer;
-
private final boolean overwrite;
+ @Nullable private final SinkWriter<SinkRecord> logWriter;
+
+ @Nullable private final LogWriteCallback logCallback;
+
private final ExecutorService compactExecutor;
private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
@@ -61,12 +68,14 @@ public class StoreSinkWriter<WriterStateT>
public StoreSinkWriter(
FileStoreWrite fileStoreWrite,
SinkRecordConverter recordConverter,
- FileCommittableSerializer fileCommitSerializer,
- boolean overwrite) {
+ boolean overwrite,
+ @Nullable SinkWriter<SinkRecord> logWriter,
+ @Nullable LogWriteCallback logCallback) {
this.fileStoreWrite = fileStoreWrite;
this.recordConverter = recordConverter;
- this.fileCommitSerializer = fileCommitSerializer;
this.overwrite = overwrite;
+ this.logWriter = logWriter;
+ this.logCallback = logCallback;
this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
this.writers = new HashMap<>();
}
@@ -95,6 +104,11 @@ public class StoreSinkWriter<WriterStateT>
} catch (Exception e) {
throw new IOException(e);
}
+
+ // write to log store
+ if (logWriter != null) {
+ logWriter.write(record, context);
+ }
}
private void writeToFileStore(RecordWriter writer, SinkRecord record)
throws Exception {
@@ -119,15 +133,22 @@ public class StoreSinkWriter<WriterStateT>
}
@Override
- public void flush(boolean endOfInput) {}
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ if (logWriter != null) {
+ logWriter.flush(endOfInput);
+ }
+ }
@Override
public List<WriterStateT> snapshotState(long checkpointId) throws
IOException {
+ if (logWriter != null && logWriter instanceof StatefulSinkWriter) {
+ return ((StatefulSinkWriter<?, WriterStateT>)
logWriter).snapshotState(checkpointId);
+ }
return Collections.emptyList();
}
@Override
- public List<Committable> prepareCommit() throws IOException {
+ public List<Committable> prepareCommit() throws IOException,
InterruptedException {
List<Committable> committables = new ArrayList<>();
Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>>
partIter =
writers.entrySet().iterator();
@@ -146,11 +167,7 @@ public class StoreSinkWriter<WriterStateT>
} catch (Exception e) {
throw new IOException(e);
}
- committables.add(
- new Committable(
- Committable.Kind.FILE,
- fileCommitSerializer.serialize(committable),
- fileCommitSerializer.getVersion()));
+ committables.add(new Committable(Committable.Kind.FILE,
committable));
// clear if no update
// we need a mechanism to clear writers, otherwise there will
be more and more
@@ -166,6 +183,25 @@ public class StoreSinkWriter<WriterStateT>
}
}
+ if (logWriter != null) {
+ if (logWriter instanceof PrecommittingSinkWriter) {
+ Collection<?> logCommittables =
+ ((PrecommittingSinkWriter<?, ?>)
logWriter).prepareCommit();
+ for (Object logCommittable : logCommittables) {
+ committables.add(new Committable(Committable.Kind.LOG,
logCommittable));
+ }
+ }
+
+ Objects.requireNonNull(logCallback, "logCallback should not be
null.");
+ logCallback
+ .offsets()
+ .forEach(
+ (k, v) ->
+ committables.add(
+ new Committable(
+
Committable.Kind.LOG_OFFSET,
+ new
LogOffsetCommittable(k, v))));
+ }
return committables;
}
@@ -187,6 +223,10 @@ public class StoreSinkWriter<WriterStateT>
}
}
writers.clear();
+
+ if (logWriter != null) {
+ logWriter.close();
+ }
}
@VisibleForTesting
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
similarity index 62%
copy from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
copy to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
index 36b1680..7b807bb 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
@@ -39,28 +39,23 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/** An operator that processes committables of a {@link Sink}. */
-public class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator<Void>
- implements OneInputStreamOperator<CommittableMessage<CommT>, Void>,
BoundedOneInput {
+public abstract class AbstractCommitterOperator<IN, CommT>
+ extends AbstractStreamOperator<CommittableMessage<IN>>
+ implements OneInputStreamOperator<CommittableMessage<IN>,
CommittableMessage<IN>>,
+ BoundedOneInput {
- private static final Logger LOG =
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+ private static final long serialVersionUID = 1L;
- /** Record all the committables until commit. */
- private final Deque<CommT> committables = new ArrayDeque<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommitterOperator.class);
- /**
- * Aggregate committables to global committables and commit the global
committables to the
- * external system.
- */
- private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory;
+ /** Record all the inputs until commit. */
+ private final Deque<IN> inputs = new ArrayDeque<>();
/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]>
STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -68,21 +63,16 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
"streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);
/** Group the committable by the checkpoint id. */
- private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
+ private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
/** The committable's serializer. */
- private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
- committableSerializer;
+ private final SerializableSupplier<SimpleVersionedSerializer<CommT>>
committableSerializer;
/** The operator's state. */
- private ListState<GlobalCommT> streamingCommitterState;
-
- private GlobalCommitter<CommT, GlobalCommT> committer;
+ private ListState<CommT> streamingCommitterState;
- public GlobalCommitterOperator(
- SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory,
- SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
committableSerializer) {
- this.committerFactory = checkNotNull(committerFactory);
+ public AbstractCommitterOperator(
+ SerializableSupplier<SimpleVersionedSerializer<CommT>>
committableSerializer) {
this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -91,36 +81,43 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- committer = committerFactory.get();
streamingCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
committableSerializer.get());
- List<GlobalCommT> restored = new ArrayList<>();
+ List<CommT> restored = new ArrayList<>();
streamingCommitterState.get().forEach(restored::add);
streamingCommitterState.clear();
- committer.commit(committer.filterRecoveredCommittables(restored));
+ commit(true, restored);
}
+ public abstract void commit(boolean isRecover, List<CommT> committables)
throws Exception;
+
+ public abstract List<CommT> toCommittables(long checkpoint, List<IN>
inputs) throws Exception;
+
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
- List<CommT> committables = pollCommittables();
- if (committables.size() > 0) {
+ List<IN> poll = pollInputs();
+ if (poll.size() > 0) {
committablesPerCheckpoint.put(
- context.getCheckpointId(),
- committer.combine(context.getCheckpointId(),
committables));
+ context.getCheckpointId(),
toCommittables(context.getCheckpointId(), poll));
}
- streamingCommitterState.update(new
ArrayList<>(committablesPerCheckpoint.values()));
+
streamingCommitterState.update(committables(committablesPerCheckpoint));
+ }
+
+ private List<CommT> committables(NavigableMap<Long, List<CommT>> map) {
+ List<CommT> committables = new ArrayList<>();
+ map.values().forEach(committables::addAll);
+ return committables;
}
@Override
public void endInput() throws Exception {
- List<CommT> allCommittables = pollCommittables();
- if (!allCommittables.isEmpty()) {
- committer.commit(
-
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
+ List<IN> poll = pollInputs();
+ if (!poll.isEmpty()) {
+ commit(false, toCommittables(Long.MAX_VALUE, poll));
}
}
@@ -128,31 +125,31 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
LOG.info("Committing the state for checkpoint {}", checkpointId);
- NavigableMap<Long, GlobalCommT> headMap =
+ NavigableMap<Long, List<CommT>> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- committer.commit(new ArrayList<>(headMap.values()));
+ commit(false, committables(headMap));
headMap.clear();
}
@Override
- public void processElement(StreamRecord<CommittableMessage<CommT>>
element) {
- CommittableMessage<CommT> message = element.getValue();
+ public void processElement(StreamRecord<CommittableMessage<IN>> element) {
+ output.collect(element);
+ CommittableMessage<IN> message = element.getValue();
if (message instanceof CommittableWithLineage) {
- this.committables.add(((CommittableWithLineage<CommT>)
message).getCommittable());
+ this.inputs.add(((CommittableWithLineage<IN>)
message).getCommittable());
}
}
@Override
public void close() throws Exception {
- committer.close();
committablesPerCheckpoint.clear();
- committables.clear();
+ inputs.clear();
super.close();
}
- private List<CommT> pollCommittables() {
- List<CommT> committables = new ArrayList<>(this.committables);
- this.committables.clear();
- return committables;
+ private List<IN> pollInputs() {
+ List<IN> poll = new ArrayList<>(this.inputs);
+ this.inputs.clear();
+ return poll;
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
index 36b1680..a606bf9 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -17,142 +17,60 @@
package org.apache.flink.table.store.connector.sink.global;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.function.SerializableSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.Collections;
-import java.util.Deque;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** An operator that processes committables of a {@link Sink}. */
-public class GlobalCommitterOperator<CommT, GlobalCommT> extends
AbstractStreamOperator<Void>
- implements OneInputStreamOperator<CommittableMessage<CommT>, Void>,
BoundedOneInput {
+/** An {@link AbstractCommitterOperator} to process global committer. */
+public class GlobalCommitterOperator<CommT, GlobalCommT>
+ extends AbstractCommitterOperator<CommT, GlobalCommT> {
- private static final Logger LOG =
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+ private static final long serialVersionUID = 1L;
- /** Record all the committables until commit. */
- private final Deque<CommT> committables = new ArrayDeque<>();
+ private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory;
/**
* Aggregate committables to global committables and commit the global
committables to the
* external system.
*/
- private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory;
-
- /** The operator's state descriptor. */
- private static final ListStateDescriptor<byte[]>
STREAMING_COMMITTER_RAW_STATES_DESC =
- new ListStateDescriptor<>(
- "streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);
-
- /** Group the committable by the checkpoint id. */
- private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
-
- /** The committable's serializer. */
- private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
- committableSerializer;
-
- /** The operator's state. */
- private ListState<GlobalCommT> streamingCommitterState;
-
private GlobalCommitter<CommT, GlobalCommT> committer;
public GlobalCommitterOperator(
SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory,
SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
committableSerializer) {
+ super(committableSerializer);
this.committerFactory = checkNotNull(committerFactory);
- this.committableSerializer = committableSerializer;
- this.committablesPerCheckpoint = new TreeMap<>();
- setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
- super.initializeState(context);
committer = committerFactory.get();
- streamingCommitterState =
- new SimpleVersionedListState<>(
- context.getOperatorStateStore()
-
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
- committableSerializer.get());
- List<GlobalCommT> restored = new ArrayList<>();
- streamingCommitterState.get().forEach(restored::add);
- streamingCommitterState.clear();
- committer.commit(committer.filterRecoveredCommittables(restored));
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
- List<CommT> committables = pollCommittables();
- if (committables.size() > 0) {
- committablesPerCheckpoint.put(
- context.getCheckpointId(),
- committer.combine(context.getCheckpointId(),
committables));
- }
- streamingCommitterState.update(new
ArrayList<>(committablesPerCheckpoint.values()));
+ super.initializeState(context);
}
@Override
- public void endInput() throws Exception {
- List<CommT> allCommittables = pollCommittables();
- if (!allCommittables.isEmpty()) {
- committer.commit(
-
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
+ public void commit(boolean isRecover, List<GlobalCommT> committables)
+ throws IOException, InterruptedException {
+ if (isRecover) {
+ committables = committer.filterRecoveredCommittables(committables);
}
+ committer.commit(committables);
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- super.notifyCheckpointComplete(checkpointId);
- LOG.info("Committing the state for checkpoint {}", checkpointId);
- NavigableMap<Long, GlobalCommT> headMap =
- committablesPerCheckpoint.headMap(checkpointId, true);
- committer.commit(new ArrayList<>(headMap.values()));
- headMap.clear();
- }
-
- @Override
- public void processElement(StreamRecord<CommittableMessage<CommT>>
element) {
- CommittableMessage<CommT> message = element.getValue();
- if (message instanceof CommittableWithLineage) {
- this.committables.add(((CommittableWithLineage<CommT>)
message).getCommittable());
- }
+ public List<GlobalCommT> toCommittables(long checkpoint, List<CommT>
inputs) throws Exception {
+ return Collections.singletonList(committer.combine(checkpoint,
inputs));
}
@Override
public void close() throws Exception {
committer.close();
- committablesPerCheckpoint.clear();
- committables.clear();
super.close();
}
-
- private List<CommT> pollCommittables() {
- List<CommT> committables = new ArrayList<>(this.committables);
- this.committables.clear();
- return committables;
- }
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
index 4b2b512..90d97b9 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.connector.sink.global;
-import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
@@ -36,11 +35,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
extends TwoPhaseCommittingSink<InputT, CommT> {
- @Override
- default Committer<CommT> createCommitter() {
- throw new UnsupportedOperationException("Please create global
committer.");
- }
-
/**
* Creates a {@link GlobalCommitter} that permanently makes the previously
written data visible
* through {@link GlobalCommitter#commit}.
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
index e547642..c034276 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.connector.sink.global;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -28,38 +27,47 @@ import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
/** A translator for the {@link GlobalCommittingSink}. */
public class GlobalCommittingSinkTranslator {
- private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
-
private static final String WRITER_NAME = "Writer";
+ private static final String LOCAL_COMMITTER_NAME = "Local Committer";
+
+ private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT>
sink) {
TypeInformation<CommittableMessage<CommT>> commitType =
CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
- boolean checkpointingEnabled =
-
input.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled();
-
- // We cannot determine the mode, when the execution mode is auto.
- // We set isBatch to false and only use checkpointingEnabled to
determine if we want to do
- // the final commit.
- // When isBatch is true, only the checkpointID is different, which has
no effect on the
- // commit operator.
-
SingleOutputStreamOperator<CommittableMessage<CommT>> written =
- input.transform(
- WRITER_NAME,
- commitType,
- new SinkWriterOperatorFactory<>(sink, false,
checkpointingEnabled));
+ input.transform(WRITER_NAME, commitType, new
SinkWriterOperatorFactory<>(sink))
+ .setParallelism(input.getParallelism());
+
+ SingleOutputStreamOperator<CommittableMessage<CommT>> local =
+ written.transform(
+ LOCAL_COMMITTER_NAME,
+ commitType,
+ new LocalCommitterOperator<>(
+ () -> {
+ try {
+ return sink.createCommitter();
+ } catch (IOException e) {
+ throw new
UncheckedIOException(e);
+ }
+ },
+ sink::getCommittableSerializer))
+ .setParallelism(written.getParallelism());
- SingleOutputStreamOperator<Void> committed =
- written.global()
+ SingleOutputStreamOperator<?> committed =
+ local.global()
.transform(
GLOBAL_COMMITTER_NAME,
- Types.VOID,
+ commitType,
new GlobalCommitterOperator<>(
sink::createGlobalCommitter,
sink::getGlobalCommittableSerializer))
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
new file mode 100644
index 0000000..9e7974c
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
+import
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link AbstractCommitterOperator} to process local committer. */
+public class LocalCommitterOperator<CommT> extends
AbstractCommitterOperator<CommT, CommT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableSupplier<Committer<CommT>> committerFactory;
+
+ private Committer<CommT> committer;
+
+ public LocalCommitterOperator(
+ SerializableSupplier<Committer<CommT>> committerFactory,
+ SerializableSupplier<SimpleVersionedSerializer<CommT>>
committableSerializer) {
+ super(committableSerializer);
+ this.committerFactory = checkNotNull(committerFactory);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ committer = committerFactory.get();
+ super.initializeState(context);
+ }
+
+ @Override
+ public void commit(boolean isRecover, List<CommT> committables)
+ throws IOException, InterruptedException {
+ if (committables.isEmpty()) {
+ return;
+ }
+
+ List<CommitRequestImpl> requests = new
ArrayList<>(committables.size());
+ for (CommT comm : committables) {
+ requests.add(new CommitRequestImpl(comm));
+ }
+
+ long sleep = 1000;
+ while (true) {
+ // commit
+ requests.forEach(CommitRequestImpl::setSelected);
+ committer.commit(new ArrayList<>(requests));
+ requests.forEach(CommitRequestImpl::setCommittedIfNoError);
+
+ // drain finished
+ requests.removeIf(CommitRequestImpl::isFinished);
+ if (requests.isEmpty()) {
+ return;
+ }
+
+ //noinspection BusyWait
+ Thread.sleep(sleep);
+ sleep *= 2;
+ }
+ }
+
+ @Override
+ public List<CommT> toCommittables(long checkpoint, List<CommT> inputs) {
+ return inputs;
+ }
+
+ @Override
+ public void close() throws Exception {
+ committer.close();
+ super.close();
+ }
+
+ /** {@link CommitRequest} implementation. */
+ public class CommitRequestImpl implements CommitRequest<CommT> {
+
+ private CommT committable;
+ private int numRetries;
+ private CommitRequestState state;
+
+ private CommitRequestImpl(CommT committable) {
+ this.committable = committable;
+ this.state = CommitRequestState.RECEIVED;
+ }
+
+ private boolean isFinished() {
+ return state.isFinalState();
+ }
+
+ @Override
+ public CommT getCommittable() {
+ return this.committable;
+ }
+
+ @Override
+ public int getNumberOfRetries() {
+ return this.numRetries;
+ }
+
+ @Override
+ public void signalFailedWithKnownReason(Throwable t) {
+ this.state = CommitRequestState.FAILED;
+ }
+
+ @Override
+ public void signalFailedWithUnknownReason(Throwable t) {
+ this.state = CommitRequestState.FAILED;
+ throw new IllegalStateException("Failed to commit " +
this.committable, t);
+ }
+
+ @Override
+ public void retryLater() {
+ this.state = CommitRequestState.RETRY;
+ ++this.numRetries;
+ }
+
+ @Override
+ public void updateAndRetryLater(CommT committable) {
+ this.committable = committable;
+ this.retryLater();
+ }
+
+ @Override
+ public void signalAlreadyCommitted() {
+ this.state = CommitRequestState.COMMITTED;
+ }
+
+ void setSelected() {
+ state = CommitRequestState.RECEIVED;
+ }
+
+ void setCommittedIfNoError() {
+ if (state == CommitRequestState.RECEIVED) {
+ state = CommitRequestState.COMMITTED;
+ }
+ }
+ }
+
+ /** Convert a {@link CommitRequest} to another type. */
+ public static <CommT, NewT> CommitRequest<NewT> convertCommitRequest(
+ CommitRequest<CommT> request, Function<CommT, NewT> to,
Function<NewT, CommT> from) {
+ return new CommitRequest<NewT>() {
+
+ @Override
+ public NewT getCommittable() {
+ return to.apply(request.getCommittable());
+ }
+
+ @Override
+ public int getNumberOfRetries() {
+ return request.getNumberOfRetries();
+ }
+
+ @Override
+ public void signalFailedWithKnownReason(Throwable throwable) {
+ request.signalFailedWithKnownReason(throwable);
+ }
+
+ @Override
+ public void signalFailedWithUnknownReason(Throwable throwable) {
+ request.signalFailedWithUnknownReason(throwable);
+ }
+
+ @Override
+ public void retryLater() {
+ request.retryLater();
+ }
+
+ @Override
+ public void updateAndRetryLater(NewT committable) {
+ request.updateAndRetryLater(from.apply(committable));
+ }
+
+ @Override
+ public void signalAlreadyCommitted() {
+ request.signalAlreadyCommitted();
+ }
+ };
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 28f2f8f..f671476 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -39,6 +38,7 @@ import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -77,7 +77,7 @@ public class FileStoreITCase extends AbstractTestBase {
private static final RowType KEY_TYPE =
new RowType(Collections.singletonList(new RowType.RowField("k",
new IntType())));
- private static final RowType VALUE_TYPE =
+ public static final RowType VALUE_TYPE =
new RowType(
Arrays.asList(
new RowType.RowField("v", new IntType()),
@@ -86,7 +86,7 @@ public class FileStoreITCase extends AbstractTestBase {
new RowType.RowField("_k", new IntType())));
@SuppressWarnings({"unchecked", "rawtypes"})
- private static final DataStructureConverter<RowData, Row> CONVERTER =
+ public static final DataStructureConverter<RowData, Row> CONVERTER =
(DataStructureConverter)
DataStructureConverters.getConverter(
TypeConversions.fromLogicalToDataType(VALUE_TYPE));
@@ -209,10 +209,10 @@ public class FileStoreITCase extends AbstractTestBase {
return env;
}
- public static Configuration buildConfiguration(boolean isBatch, File
folder) {
+ public static Configuration buildConfiguration(boolean noFail, File
folder) {
Configuration options = new Configuration();
options.set(BUCKET, NUM_BUCKET);
- if (isBatch) {
+ if (noFail) {
options.set(FILE_PATH, folder.toURI().toString());
} else {
FailingAtomicRenameFileSystem.get().reset(3, 100);
@@ -237,7 +237,7 @@ public class FileStoreITCase extends AbstractTestBase {
return isBatch
? env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(VALUE_TYPE))
: env.addSource(
- new FiniteTestSource<>(null, SOURCE_DATA),
InternalTypeInfo.of(VALUE_TYPE));
+ new FiniteTestSource<>(SOURCE_DATA),
InternalTypeInfo.of(VALUE_TYPE));
}
public static void write(DataStream<RowData> input, FileStore fileStore,
boolean partitioned)
@@ -251,11 +251,28 @@ public class FileStoreITCase extends AbstractTestBase {
boolean partitioned,
@Nullable Map<String, String> overwritePartition)
throws Exception {
+ write(input, fileStore, partitioned, overwritePartition, null);
+ }
+
+ public static void write(
+ DataStream<RowData> input,
+ FileStore fileStore,
+ boolean partitioned,
+ @Nullable Map<String, String> overwritePartition,
+ @Nullable LogSinkProvider logSinkProvider)
+ throws Exception {
int[] partitions = partitioned ? new int[] {1} : new int[0];
int[] keys = new int[] {2};
StoreSink<?, ?> sink =
new StoreSink<>(
- null, fileStore, partitions, keys, NUM_BUCKET, null,
overwritePartition);
+ null,
+ fileStore,
+ partitions,
+ keys,
+ NUM_BUCKET,
+ null,
+ overwritePartition,
+ logSinkProvider);
input = input.keyBy(row -> row.getInt(2)); // key by
GlobalCommittingSinkTranslator.translate(input, sink);
input.getExecutionEnvironment().execute();
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
new file mode 100644
index 0000000..0e60cea
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A stream source that: 1) emits a list of elements without allowing
checkpoints, 2) then waits for
+ * two more checkpoints to complete, 3) then re-emits the same elements before
4) waiting for
+ * another two checkpoints and 5) exiting.
+ *
+ * <p>The reason this class is rewritten is to support {@link
CheckpointedFunction}.
+ */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class FiniteTestSource<T>
+ implements SourceFunction<T>, CheckpointedFunction, CheckpointListener
{
+
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Iterable<T> elements;
+
+ private volatile boolean running = true;
+
+ private transient int numCheckpointsComplete;
+
+ private transient ListState<Integer> checkpointedState;
+
+ private volatile int numTimesEmitted;
+
+ public FiniteTestSource(Iterable<T> elements) {
+ this.elements = elements;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ this.checkpointedState =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>("emit-times",
IntSerializer.INSTANCE));
+
+ if (context.isRestored()) {
+ List<Integer> retrievedStates = new ArrayList<>();
+ for (Integer entry : this.checkpointedState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only
have 1 state
+ Preconditions.checkArgument(
+ retrievedStates.size() == 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ this.numTimesEmitted = retrievedStates.get(0);
+ Preconditions.checkArgument(
+ numTimesEmitted <= 2,
+ getClass().getSimpleName()
+ + " retrieved invalid numTimesEmitted: "
+ + numTimesEmitted);
+ } else {
+ this.numTimesEmitted = 0;
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> ctx) throws Exception {
+ switch (numTimesEmitted) {
+ case 0:
+ emitElementsAndWaitForCheckpoints(ctx);
+ emitElementsAndWaitForCheckpoints(ctx);
+ break;
+ case 1:
+ emitElementsAndWaitForCheckpoints(ctx);
+ break;
+ case 2:
+ // Maybe missed notifyCheckpointComplete, wait next
notifyCheckpointComplete
+ final Object lock = ctx.getCheckpointLock();
+ synchronized (lock) {
+ int checkpointToAwait = numCheckpointsComplete + 2;
+ while (running && numCheckpointsComplete <
checkpointToAwait) {
+ lock.wait(1);
+ }
+ }
+ break;
+ }
+ }
+
+ private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx)
+ throws InterruptedException {
+ final Object lock = ctx.getCheckpointLock();
+
+ final int checkpointToAwait;
+ synchronized (lock) {
+ checkpointToAwait = numCheckpointsComplete + 2;
+ for (T t : elements) {
+ ctx.collect(t);
+ }
+ numTimesEmitted++;
+ }
+
+ synchronized (lock) {
+ while (running && numCheckpointsComplete < checkpointToAwait) {
+ lock.wait(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numCheckpointsComplete++;
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ Preconditions.checkState(
+ this.checkpointedState != null,
+ "The " + getClass().getSimpleName() + " has not been properly
initialized.");
+
+ this.checkpointedState.clear();
+ this.checkpointedState.add(this.numTimesEmitted);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 2d59440..17733f7 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -18,21 +18,76 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+
+import static
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
+import static
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link CommittableSerializer}. */
public class CommittableSerializerTest {
+ private final FileCommittableSerializer fileSerializer =
+ new FileCommittableSerializer(
+ RowType.of(new IntType()),
+ RowType.of(new IntType()),
+ RowType.of(new IntType()));
+
+ private final CommittableSerializer serializer =
+ new CommittableSerializer(
+ fileSerializer,
+ (SimpleVersionedSerializer)
TestSink.StringCommittableSerializer.INSTANCE);
+
+ @Test
+ public void testFile() throws IOException {
+ Increment increment = randomIncrement();
+ FileCommittable committable = new FileCommittable(row(0), 1,
increment);
+ FileCommittable newCommittable =
+ (FileCommittable)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new Committable(
+ Committable.Kind.FILE,
committable)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(committable);
+ }
+
+ @Test
+ public void testLogOffset() throws IOException {
+ LogOffsetCommittable committable = new LogOffsetCommittable(2, 3);
+ LogOffsetCommittable newCommittable =
+ (LogOffsetCommittable)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new Committable(
+
Committable.Kind.LOG_OFFSET, committable)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(committable);
+ }
+
@Test
- public void test() {
- byte[] bytes = new byte[] {4, 5, 1};
- Committable committable = new Committable(Committable.Kind.LOG, bytes,
9);
- byte[] serialize =
CommittableSerializer.INSTANCE.serialize(committable);
- Committable deser = CommittableSerializer.INSTANCE.deserialize(1,
serialize);
- assertThat(deser.kind()).isEqualTo(Committable.Kind.LOG);
- assertThat(deser.serializerVersion()).isEqualTo(9);
- assertThat(deser.wrappedCommittable()).isEqualTo(bytes);
+ public void testLog() throws IOException {
+ String log = "random_string";
+ String newCommittable =
+ (String)
+ serializer
+ .deserialize(
+ 1,
+ serializer.serialize(
+ new
Committable(Committable.Kind.LOG, log)))
+ .wrappedCommittable();
+ assertThat(newCommittable).isEqualTo(log);
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
deleted file mode 100644
index 31753c3..0000000
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalCommittableSerializer}. */
-public class GlobalCommittableSerializerTest {
-
- @Test
- public void test() throws IOException {
- List<String> logs = Arrays.asList("1", "2");
- GlobalCommittable<String> committable =
- new GlobalCommittable<>(logs,
ManifestCommittableSerializerTest.create());
- GlobalCommittableSerializer<String> serializer =
- new GlobalCommittableSerializer<>(
- new TestStringSerializer(),
ManifestCommittableSerializerTest.serializer());
- byte[] serialized = serializer.serialize(committable);
- GlobalCommittable<String> deser = serializer.deserialize(1,
serialized);
-
assertThat(deser.logCommittables()).isEqualTo(committable.logCommittables());
-
assertThat(deser.fileCommittable()).isEqualTo(committable.fileCommittable());
- }
-
- private static final class TestStringSerializer implements
SimpleVersionedSerializer<String> {
-
- private static final int VERSION = 1073741823;
-
- private TestStringSerializer() {}
-
- public int getVersion() {
- return VERSION;
- }
-
- public byte[] serialize(String str) {
- return str.getBytes(StandardCharsets.UTF_8);
- }
-
- public String deserialize(int version, byte[] serialized) {
- assertThat(version).isEqualTo(VERSION);
- return new String(serialized, StandardCharsets.UTF_8);
- }
- }
-}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
new file mode 100644
index 0000000..023b4b6
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.FileStoreITCase;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
+import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildConfiguration;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildFileStore;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
+import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
+import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
+import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for table store with kafka. */
+public class LogStoreSinkITCase extends KafkaTableTestBase {
+
+ @Test
+ public void testStreamingPartitioned() throws Exception {
+ innerTest("testStreamingPartitioned", false, true, true);
+ }
+
+ @Test
+ public void testStreamingNonPartitioned() throws Exception {
+ innerTest("testStreamingNonPartitioned", false, false, true);
+ }
+
+ @Test
+ public void testBatchPartitioned() throws Exception {
+ innerTest("testBatchPartitioned", true, true, true);
+ }
+
+ @Test
+ public void testStreamingEventual() throws Exception {
+ innerTest("testStreamingEventual", false, true, false);
+ }
+
+ private void innerTest(String name, boolean isBatch, boolean partitioned,
boolean transaction)
+ throws Exception {
+ StreamExecutionEnvironment env = isBatch ? buildBatchEnv() :
buildStreamEnv();
+
+ // in eventual mode, failure will result in duplicate data
+ FileStore fileStore =
+ buildFileStore(
+ buildConfiguration(isBatch || !transaction,
TEMPORARY_FOLDER.newFolder()),
+ partitioned);
+
+ // prepare log
+ DynamicTableFactory.Context context =
+ KafkaLogTestUtils.testContext(
+ name,
+ getBootstrapServers(),
+ LogOptions.LogChangelogMode.AUTO,
+ transaction
+ ? LogOptions.LogConsistency.TRANSACTIONAL
+ : LogOptions.LogConsistency.EVENTUAL,
+ FileStoreITCase.VALUE_TYPE,
+ new int[] {2});
+
+ KafkaLogStoreFactory factory = discoverKafkaLogFactory();
+ KafkaLogSinkProvider sinkProvider =
factory.createSinkProvider(context, SINK_CONTEXT);
+ KafkaLogSourceProvider sourceProvider =
+ factory.createSourceProvider(context, SOURCE_CONTEXT);
+
+ factory.onCreateTable(context, 3, true);
+
+ try {
+ // write
+ DataStreamSource<RowData> finiteSource = buildTestSource(env,
isBatch);
+ FileStoreITCase.write(finiteSource, fileStore, partitioned, null,
sinkProvider);
+
+ // read
+ List<Row> results = FileStoreITCase.read(env, fileStore);
+
+ Row[] expected =
+ partitioned
+ ? new Row[] {
+ Row.of(5, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p1", 1),
+ Row.of(0, "p1", 2)
+ }
+ : new Row[] {
+ Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)
+ };
+ assertThat(results).containsExactlyInAnyOrder(expected);
+
+ results =
+ buildStreamEnv()
+ .fromSource(
+ sourceProvider.createSource(null),
+ WatermarkStrategy.noWatermarks(),
+ "source")
+ .executeAndCollect(isBatch ? 6 : 12).stream()
+ .map(FileStoreITCase.CONVERTER::toExternal)
+ .collect(Collectors.toList());
+
+ if (isBatch) {
+ expected =
+ new Row[] {
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1)
+ };
+ } else {
+ // read log
+ // expect origin data X 2 (FiniteTestSource)
+ expected =
+ new Row[] {
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1),
+ Row.of(0, "p1", 1),
+ Row.of(0, "p1", 2),
+ Row.of(5, "p1", 1),
+ Row.of(6, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p2", 1)
+ };
+ }
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ } finally {
+ factory.onDropTable(context, true);
+ }
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index fd3d97f..e2ef3e7 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
@@ -123,7 +124,8 @@ public class StoreSinkTest {
primaryKeys,
2,
() -> lock,
- new HashMap<>());
+ new HashMap<>(),
+ null);
writeAndCommit(
sink,
GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
@@ -225,7 +227,7 @@ public class StoreSinkTest {
private void commit(StoreSink<?, ?> sink, List<Committable>
fileCommittables) throws Exception {
StoreGlobalCommitter committer = sink.createGlobalCommitter();
- GlobalCommittable<?> committable = committer.combine(0,
fileCommittables);
+ ManifestCommittable committable = committer.combine(0,
fileCommittables);
fileStore.expired = false;
lock.locked = false;
@@ -246,7 +248,14 @@ public class StoreSinkTest {
private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
return new StoreSink<>(
- identifier, fileStore, partitions, primaryKeys, 2, () -> lock,
overwritePartition);
+ identifier,
+ fileStore,
+ partitions,
+ primaryKeys,
+ 2,
+ () -> lock,
+ overwritePartition,
+ null);
}
private class TestLock implements CatalogLock {
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
index 3bbead5..17cbba1 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -49,7 +49,7 @@ public class GlobalCommitterOperatorTest {
@Test
public void closeCommitter() throws Exception {
final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -69,7 +69,7 @@ public class GlobalCommitterOperatorTest {
buildSubtaskState(createTestHarness(), input2);
final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness =
createTestHarness(globalCommitter);
final OperatorSubtaskState mergedOperatorSubtaskState =
@@ -108,7 +108,7 @@ public class GlobalCommitterOperatorTest {
expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -138,7 +138,7 @@ public class GlobalCommitterOperatorTest {
final DefaultGlobalCommitter globalCommitter =
new DefaultGlobalCommitter(successCommittedCommittable);
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness =
createTestHarness(globalCommitter);
// all data from previous checkpoint are expected to be committed,
@@ -155,7 +155,7 @@ public class GlobalCommitterOperatorTest {
public void endOfInput() throws Exception {
final DefaultGlobalCommitter globalCommitter = new
DefaultGlobalCommitter();
- final OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness =
+ final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness =
createTestHarness(globalCommitter);
testHarness.initializeEmptyState();
testHarness.open();
@@ -166,12 +166,12 @@ public class GlobalCommitterOperatorTest {
assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
}
- private OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> createTestHarness()
+ private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
createTestHarness()
throws Exception {
return createTestHarness(new DefaultGlobalCommitter());
}
- private OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> createTestHarness(
+ private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
createTestHarness(
GlobalCommitter<String, String> globalCommitter) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new GlobalCommitterOperator<>(
@@ -183,7 +183,7 @@ public class GlobalCommitterOperatorTest {
}
public static OperatorSubtaskState buildSubtaskState(
- OneInputStreamOperatorTestHarness<CommittableMessage<String>,
Void> testHarness,
+ OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?>
testHarness,
List<String> input)
throws Exception {
testHarness.initializeEmptyState();
@@ -199,7 +199,7 @@ public class GlobalCommitterOperatorTest {
return operatorSubtaskState;
}
- private static List<StreamRecord<CommittableMessage<String>>>
committableRecords(
+ static List<StreamRecord<CommittableMessage<String>>> committableRecords(
Collection<String> elements) {
return elements.stream()
.map(GlobalCommitterOperatorTest::toCommittableMessage)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
new file mode 100644
index 0000000..528f5dc
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import
org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.buildSubtaskState;
+import static
org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.committableRecords;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+/** Test the {@link LocalCommitterOperator}. */
+public class LocalCommitterOperatorTest {
+
+ @Test
+ public void supportRetry() throws Exception {
+ final List<String> input = Arrays.asList("lazy", "leaf");
+ final RetryOnceCommitter committer = new RetryOnceCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ testHarness = createTestHarness(committer);
+
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.processElements(committableRecords(input));
+ testHarness.prepareSnapshotPreBarrier(1);
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ testHarness.snapshot(2L, 2L);
+ testHarness.notifyOfCompletedCheckpoint(2L);
+
+ testHarness.close();
+
+ assertThat(committer.getCommittedData()).contains("lazy", "leaf");
+ }
+
+ @Test
+ public void closeCommitter() throws Exception {
+ final DefaultCommitter committer = new DefaultCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ testHarness = createTestHarness(committer);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+ testHarness.close();
+ assertThat(committer.isClosed()).isTrue();
+ }
+
+ @Test
+ public void restoredFromMergedState() throws Exception {
+ final List<String> input1 = Arrays.asList("today", "whom");
+ final OperatorSubtaskState operatorSubtaskState1 =
+ buildSubtaskState(createTestHarness(), input1);
+
+ final List<String> input2 = Arrays.asList("future", "evil", "how");
+ final OperatorSubtaskState operatorSubtaskState2 =
+ buildSubtaskState(createTestHarness(), input2);
+
+ final DefaultCommitter committer = new DefaultCommitter();
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ testHarness = createTestHarness(committer);
+
+ final OperatorSubtaskState mergedOperatorSubtaskState =
+ OneInputStreamOperatorTestHarness.repackageState(
+ operatorSubtaskState1, operatorSubtaskState2);
+
+ testHarness.initializeState(
+ OneInputStreamOperatorTestHarness.repartitionOperatorState(
+ mergedOperatorSubtaskState, 2, 2, 1, 0));
+ testHarness.open();
+
+ final List<String> expectedOutput = new ArrayList<>();
+ expectedOutput.addAll(input1);
+ expectedOutput.addAll(input2);
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+ testHarness.snapshot(1L, 1L);
+ testHarness.notifyOfCompletedCheckpoint(1);
+
+ testHarness.close();
+
+ assertThat(committer.getCommittedData())
+ .containsExactlyInAnyOrder(expectedOutput.toArray(new
String[0]));
+ }
+
+ @Test
+ public void commitMultipleStagesTogether() throws Exception {
+
+ final DefaultCommitter committer = new DefaultCommitter();
+
+ final List<String> input1 = Arrays.asList("cautious", "nature");
+ final List<String> input2 = Arrays.asList("count", "over");
+ final List<String> input3 = Arrays.asList("lawyer", "grammar");
+
+ final List<String> expectedOutput = new ArrayList<>();
+
+ expectedOutput.addAll(input1);
+ expectedOutput.addAll(input2);
+ expectedOutput.addAll(input3);
+
+ final OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ testHarness = createTestHarness(committer);
+ testHarness.initializeEmptyState();
+ testHarness.open();
+
+ testHarness.processElements(committableRecords(input1));
+ testHarness.prepareSnapshotPreBarrier(1L);
+ testHarness.snapshot(1L, 1L);
+ testHarness.processElements(committableRecords(input2));
+ testHarness.prepareSnapshotPreBarrier(2L);
+ testHarness.snapshot(2L, 2L);
+ testHarness.processElements(committableRecords(input3));
+ testHarness.prepareSnapshotPreBarrier(3L);
+ testHarness.snapshot(3L, 3L);
+
+ testHarness.notifyOfCompletedCheckpoint(1);
+ testHarness.notifyOfCompletedCheckpoint(3);
+
+ testHarness.close();
+
+
assertThat(fromRecords(testHarness.getRecordOutput())).isEqualTo(expectedOutput);
+
+ assertThat(committer.getCommittedData()).isEqualTo(expectedOutput);
+ }
+
+ private static List<String> fromRecords(
+ Collection<StreamRecord<CommittableMessage<String>>> elements) {
+ return elements.stream()
+ .map(StreamRecord::getValue)
+ .filter(message -> message instanceof CommittableWithLineage)
+ .map(message -> ((CommittableWithLineage<String>)
message).getCommittable())
+ .collect(Collectors.toList());
+ }
+
+ private OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ createTestHarness() throws Exception {
+ return createTestHarness(new DefaultCommitter());
+ }
+
+ private OneInputStreamOperatorTestHarness<
+ CommittableMessage<String>, CommittableMessage<String>>
+ createTestHarness(Committer<String> committer) throws Exception {
+ return new OneInputStreamOperatorTestHarness<>(
+ new LocalCommitterOperator<>(
+ () -> committer, () ->
StringCommittableSerializer.INSTANCE));
+ }
+
+ /** Base class for testing {@link Committer}. */
+ private static class DefaultCommitter implements Committer<String>,
Serializable {
+
+ @Nullable protected Queue<String> committedData;
+
+ private boolean isClosed;
+
+ @Nullable private final Supplier<Queue<String>> queueSupplier;
+
+ public DefaultCommitter() {
+ this.committedData = new ConcurrentLinkedQueue<>();
+ this.isClosed = false;
+ this.queueSupplier = null;
+ }
+
+ public List<String> getCommittedData() {
+ if (committedData != null) {
+ return new ArrayList<>(committedData);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> requests) {
+ if (committedData == null) {
+ assertNotNull(queueSupplier);
+ committedData = queueSupplier.get();
+ }
+ committedData.addAll(
+ requests.stream()
+ .map(CommitRequest::getCommittable)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ /** A {@link Committer} that always re-commits the committables data it
received. */
+ private static class RetryOnceCommitter extends DefaultCommitter
implements Committer<String> {
+
+ private final Set<String> seen = new LinkedHashSet<>();
+
+ @Override
+ public void commit(Collection<CommitRequest<String>> requests) {
+ requests.forEach(
+ c -> {
+ if (seen.remove(c.getCommittable())) {
+ checkNotNull(committedData);
+ committedData.add(c.getCommittable());
+ } else {
+ seen.add(c.getCommittable());
+ c.retryLater();
+ }
+ });
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
new file mode 100644
index 0000000..03c62d6
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.table.store.log.LogSinkProvider.WriteCallback;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+
+/** A {@link WriteCallback} implementation. */
+public class LogWriteCallback implements WriteCallback {
+
+ private final ConcurrentHashMap<Integer, LongAccumulator> offsetMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public void onCompletion(int bucket, long offset) {
+ LongAccumulator acc = offsetMap.get(bucket);
+ if (acc == null) {
+ // computeIfAbsent will lock on the key
+ acc = offsetMap.computeIfAbsent(bucket, k -> new
LongAccumulator(Long::max, 0));
+ } // else lock free
+ acc.accumulate(offset);
+ }
+
+ public Map<Integer, Long> offsets() {
+ Map<Integer, Long> offsets = new HashMap<>();
+ offsetMap.forEach((k, v) -> offsets.put(k, v.longValue()));
+ return offsets;
+ }
+}
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 0fad212..b4b2efc 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -63,7 +63,7 @@ import static
org.apache.flink.table.store.log.LogOptions.LogConsistency;
/** Utils for the test of {@link KafkaLogStoreFactory}. */
public class KafkaLogTestUtils {
- static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
+ public static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
new LogStoreTableFactory.SourceContext() {
@Override
public <T> TypeInformation<T> createTypeInformation(DataType
producedDataType) {
@@ -85,7 +85,7 @@ public class KafkaLogTestUtils {
}
};
- static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
+ public static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
new LogStoreTableFactory.SinkContext() {
@Override
@@ -113,7 +113,7 @@ public class KafkaLogTestUtils {
}
};
- static KafkaLogStoreFactory discoverKafkaLogFactory() {
+ public static KafkaLogStoreFactory discoverKafkaLogFactory() {
return (KafkaLogStoreFactory)
LogStoreTableFactory.discoverLogStoreFactory(
Thread.currentThread().getContextClassLoader(),
@@ -169,15 +169,27 @@ public class KafkaLogTestUtils {
LogChangelogMode changelogMode,
LogConsistency consistency,
boolean keyed) {
+ return testContext(
+ name,
+ servers,
+ changelogMode,
+ consistency,
+ RowType.of(new IntType(), new IntType()),
+ keyed ? new int[] {0} : new int[0]);
+ }
+
+ public static DynamicTableFactory.Context testContext(
+ String name,
+ String servers,
+ LogChangelogMode changelogMode,
+ LogConsistency consistency,
+ RowType type,
+ int[] keys) {
Map<String, String> options = new HashMap<>();
options.put(CHANGELOG_MODE.key(), changelogMode.toString());
options.put(CONSISTENCY.key(), consistency.toString());
options.put(BOOTSTRAP_SERVERS.key(), servers);
- return createContext(
- name,
- RowType.of(new IntType(), new IntType()),
- keyed ? new int[] {0} : new int[0],
- options);
+ return createContext(name, type, keys, options);
}
static SinkRecord testRecord(boolean keyed, int bucket, int key, int
value, RowKind rowKind) {