This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 422c409b [FLINK-30211] Introduce CompactorSink for compact jobs in
Table Store
422c409b is described below
commit 422c409b36efcd0e75fdde61b0601f405b18e3ef
Author: tsreaper <[email protected]>
AuthorDate: Wed Dec 14 14:55:05 2022 +0800
[FLINK-30211] Introduce CompactorSink for compact jobs in Table Store
This closes #435.
---
.../connector/sink/CommittableStateManager.java | 37 +++++
.../store/connector/sink/CommitterOperator.java | 60 ++------
.../table/store/connector/sink/CompactorSink.java | 55 +++++++
...kSinkBuilder.java => CompactorSinkBuilder.java} | 43 ++----
.../table/store/connector/sink/FileStoreSink.java | 78 ++++++++++
.../sink/{StoreSink.java => FlinkSink.java} | 102 ++++++-------
.../store/connector/sink/FlinkSinkBuilder.java | 5 +-
.../connector/sink/HashRowStreamPartitioner.java | 71 +++++++++
.../sink/NoopCommittableStateManager.java | 46 ++++++
.../RestoreAndFailCommittableStateManager.java | 95 ++++++++++++
.../store/connector/sink/StoreCompactOperator.java | 2 +-
.../table/store/connector/FileStoreITCase.java | 4 +-
.../connector/sink/CommitterOperatorTest.java | 159 ++++++++++++---------
.../connector/sink/CommitterOperatorTestBase.java | 105 ++++++++++++++
.../store/connector/sink/CompactorSinkITCase.java | 138 ++++++++++++++++++
.../store/connector/sink/SinkSavepointITCase.java | 2 +-
.../connector/source/CompactorSourceITCase.java | 2 +-
.../table/store/table/system/BucketsTable.java | 17 +--
18 files changed, 791 insertions(+), 230 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
new file mode 100644
index 00000000..8ef8d5dc
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableStateManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Helper interface for {@link CommitterOperator}. This interface manages
operator states about
+ * {@link org.apache.flink.table.store.file.manifest.ManifestCommittable}.
+ */
+public interface CommittableStateManager extends Serializable {
+
+ void initializeState(StateInitializationContext context, Committer
committer) throws Exception;
+
+ void snapshotState(StateSnapshotContext context, List<ManifestCommittable>
committables)
+ throws Exception;
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index ac55befd..509adac5 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -17,21 +17,15 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.function.SerializableFunction;
-import org.apache.flink.util.function.SerializableSupplier;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -44,7 +38,7 @@ import java.util.TreeMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** Committer operator to commit {@link Committable}. */
+/** Operator to commit {@link Committable}s for each snapshot. */
public class CommitterOperator extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Committable, Committable>,
BoundedOneInput {
@@ -69,22 +63,17 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
private final String initialCommitUser;
/** Group the committable by the checkpoint id. */
- private final NavigableMap<Long, ManifestCommittable>
committablesPerCheckpoint;
-
- /** The committable's serializer. */
- private final
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
- committableSerializer;
-
- /** ManifestCommittable state of this job. Used to filter out previous
successful commits. */
- private ListState<ManifestCommittable> streamingCommitterState;
+ protected final NavigableMap<Long, ManifestCommittable>
committablesPerCheckpoint;
private final SerializableFunction<String, Committer> committerFactory;
+ private final CommittableStateManager committableStateManager;
+
/**
* Aggregate committables to global committables and commit the global
committables to the
* external system.
*/
- private Committer committer;
+ protected Committer committer;
private boolean endInput = false;
@@ -92,13 +81,12 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
boolean streamingCheckpointEnabled,
String initialCommitUser,
SerializableFunction<String, Committer> committerFactory,
-
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
- committableSerializer) {
+ CommittableStateManager committableStateManager) {
this.streamingCheckpointEnabled = streamingCheckpointEnabled;
this.initialCommitUser = initialCommitUser;
- this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
this.committerFactory = checkNotNull(committerFactory);
+ this.committableStateManager = committableStateManager;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -115,35 +103,7 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
// parallelism of commit operator is always 1, so commitUser will
never be null
committer = committerFactory.apply(commitUser);
- streamingCommitterState =
- new SimpleVersionedListState<>(
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
-
"streaming_committer_raw_states",
-
BytePrimitiveArraySerializer.INSTANCE)),
- committableSerializer.get());
- List<ManifestCommittable> restored = new ArrayList<>();
- streamingCommitterState.get().forEach(restored::add);
- streamingCommitterState.clear();
- commit(true, restored);
- }
-
- private void commit(boolean isRecover, List<ManifestCommittable>
committables)
- throws Exception {
- if (isRecover) {
- committables = committer.filterRecoveredCommittables(committables);
- if (!committables.isEmpty()) {
- committer.commit(committables);
- throw new RuntimeException(
- "This exception is intentionally thrown "
- + "after committing the restored checkpoints. "
- + "By restarting the job we hope that "
- + "writers can start writing based on these
new commits.");
- }
- } else {
- committer.commit(committables);
- }
+ committableStateManager.initializeState(context, committer);
}
private ManifestCommittable toCommittables(long checkpoint,
List<Committable> inputs)
@@ -155,7 +115,7 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
pollInputs();
-
streamingCommitterState.update(committables(committablesPerCheckpoint));
+ committableStateManager.snapshotState(context,
committables(committablesPerCheckpoint));
}
private List<ManifestCommittable> committables(NavigableMap<Long,
ManifestCommittable> map) {
@@ -182,7 +142,7 @@ public class CommitterOperator extends
AbstractStreamOperator<Committable>
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, ManifestCommittable> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- commit(false, committables(headMap));
+ committer.commit(committables(headMap));
headMap.clear();
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
new file mode 100644
index 00000000..4de06f34
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSink.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.util.function.SerializableFunction;
+
+/** {@link FlinkSink} for stand-alone compact jobs. */
+public class CompactorSink extends FlinkSink {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Lock.Factory lockFactory;
+
+ public CompactorSink(FileStoreTable table, Lock.Factory lockFactory) {
+ super(table, false);
+ this.lockFactory = lockFactory;
+ }
+
+ @Override
+ protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
+ return new StoreCompactOperator(table, writeProvider, isStreaming);
+ }
+
+ @Override
+ protected SerializableFunction<String, Committer> createCommitterFactory(
+ boolean streamingCheckpointEnabled) {
+ return user -> new
StoreCommitter(table.newCommit(user).withLock(lockFactory.create()));
+ }
+
+ @Override
+ protected CommittableStateManager createCommittableStateManager() {
+ return new NoopCommittableStateManager();
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
similarity index 59%
copy from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
copy to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
index 9107e19d..aee210ae 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorSinkBuilder.java
@@ -25,62 +25,39 @@ import
org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.system.BucketsTable;
-import javax.annotation.Nullable;
-
-import java.util.Map;
-
-/** Sink builder to build a flink sink from input. */
-public class FlinkSinkBuilder {
+/** Builder for {@link CompactorSink}. */
+public class CompactorSinkBuilder {
private final FileStoreTable table;
private DataStream<RowData> input;
private Lock.Factory lockFactory = Lock.emptyFactory();
- @Nullable private Map<String, String> overwritePartition;
- @Nullable private LogSinkFunction logSinkFunction;
- @Nullable private Integer parallelism;
- public FlinkSinkBuilder(FileStoreTable table) {
+ public CompactorSinkBuilder(FileStoreTable table) {
this.table = table;
}
- public FlinkSinkBuilder withInput(DataStream<RowData> input) {
+ public CompactorSinkBuilder withInput(DataStream<RowData> input) {
this.input = input;
return this;
}
- public FlinkSinkBuilder withLockFactory(Lock.Factory lockFactory) {
+ public CompactorSinkBuilder withLockFactory(Lock.Factory lockFactory) {
this.lockFactory = lockFactory;
return this;
}
- public FlinkSinkBuilder withOverwritePartition(Map<String, String>
overwritePartition) {
- this.overwritePartition = overwritePartition;
- return this;
- }
-
- public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction
logSinkFunction) {
- this.logSinkFunction = logSinkFunction;
- return this;
- }
-
- public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
- this.parallelism = parallelism;
- return this;
- }
-
public DataStreamSink<?> build() {
- BucketStreamPartitioner partitioner = new
BucketStreamPartitioner(table.schema());
+ HashRowStreamPartitioner partitioner =
+ new HashRowStreamPartitioner(
+
BucketsTable.rowType(table.schema().logicalPartitionType()));
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(),
partitioner);
- if (parallelism != null) {
- partitioned.setParallelism(parallelism);
- }
StreamExecutionEnvironment env = input.getExecutionEnvironment();
- StoreSink sink = new StoreSink(table, lockFactory, overwritePartition,
logSinkFunction);
+ CompactorSink sink = new CompactorSink(table, lockFactory);
return sink.sinkFrom(new DataStream<>(env, partitioned));
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
new file mode 100644
index 00000000..0151f18d
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileStoreSink.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.util.function.SerializableFunction;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** {@link FlinkSink} for writing records into table store. */
+public class FileStoreSink extends FlinkSink {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Lock.Factory lockFactory;
+ @Nullable private final Map<String, String> overwritePartition;
+ @Nullable private final LogSinkFunction logSinkFunction;
+
+ public FileStoreSink(
+ FileStoreTable table,
+ Lock.Factory lockFactory,
+ @Nullable Map<String, String> overwritePartition,
+ @Nullable LogSinkFunction logSinkFunction) {
+ super(table, overwritePartition != null);
+ this.lockFactory = lockFactory;
+ this.overwritePartition = overwritePartition;
+ this.logSinkFunction = logSinkFunction;
+ }
+
+ @Override
+ protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
+ return new StoreWriteOperator(table, logSinkFunction, writeProvider);
+ }
+
+ @Override
+ protected SerializableFunction<String, Committer> createCommitterFactory(
+ boolean streamingCheckpointEnabled) {
+ // If checkpoint is enabled for streaming job, we have to
+ // commit new files list even if they're empty.
+ // Otherwise we can't tell if the commit is successful after
+ // a restart.
+ return user ->
+ new StoreCommitter(
+ table.newCommit(user)
+ .withOverwritePartition(overwritePartition)
+
.withCreateEmptyCommit(streamingCheckpointEnabled)
+ .withLock(lockFactory.create()));
+ }
+
+ @Override
+ protected CommittableStateManager createCommittableStateManager() {
+ return new
RestoreAndFailCommittableStateManager(ManifestCommittableSerializer::new);
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
similarity index 62%
rename from
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
rename to
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index 0823e6a0..3e9ee855 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -33,75 +33,46 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import
org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
-import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
+import org.apache.flink.util.function.SerializableFunction;
import java.io.Serializable;
-import java.util.Map;
import java.util.UUID;
-/** Sink of dynamic store. */
-public class StoreSink implements Serializable {
+/** Abstract sink of table store. */
+public abstract class FlinkSink implements Serializable {
private static final long serialVersionUID = 1L;
private static final String WRITER_NAME = "Writer";
-
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
- private final FileStoreTable table;
- private final Lock.Factory lockFactory;
- @Nullable private final Map<String, String> overwritePartition;
- @Nullable private final LogSinkFunction logSinkFunction;
+ protected final FileStoreTable table;
+ private final boolean isOverwrite;
- public StoreSink(
- FileStoreTable table,
- Lock.Factory lockFactory,
- @Nullable Map<String, String> overwritePartition,
- @Nullable LogSinkFunction logSinkFunction) {
+ public FlinkSink(FileStoreTable table, boolean isOverwrite) {
this.table = table;
- this.lockFactory = lockFactory;
- this.overwritePartition = overwritePartition;
- this.logSinkFunction = logSinkFunction;
+ this.isOverwrite = isOverwrite;
}
- private OneInputStreamOperator<RowData, Committable> createWriteOperator(
- String initialCommitUser) {
- boolean isOverwrite = overwritePartition != null;
- StoreSinkWrite.Provider writeProvider;
+ protected StoreSinkWrite.Provider createWriteProvider(String
initialCommitUser) {
if (table.options().changelogProducer() ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
long fullCompactionThresholdMs =
table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
- writeProvider =
- (table, context, ioManager) ->
- new FullChangelogStoreSinkWrite(
- table,
- context,
- initialCommitUser,
- ioManager,
- isOverwrite,
- fullCompactionThresholdMs);
+ return (table, context, ioManager) ->
+ new FullChangelogStoreSinkWrite(
+ table,
+ context,
+ initialCommitUser,
+ ioManager,
+ isOverwrite,
+ fullCompactionThresholdMs);
} else {
- writeProvider =
- (table, context, ioManager) ->
- new StoreSinkWriteImpl(
- table, context, initialCommitUser,
ioManager, isOverwrite);
+ return (table, context, ioManager) ->
+ new StoreSinkWriteImpl(
+ table, context, initialCommitUser, ioManager,
isOverwrite);
}
-
- return new StoreWriteOperator(table, logSinkFunction, writeProvider);
- }
-
- private StoreCommitter createCommitter(String user, boolean
createEmptyCommit) {
- return new StoreCommitter(
- table.newCommit(user)
- .withOverwritePartition(overwritePartition)
- .withCreateEmptyCommit(createEmptyCommit)
- .withLock(lockFactory.create()));
}
public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
@@ -116,18 +87,23 @@ public class StoreSink implements Serializable {
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
- CommittableTypeInfo typeInfo = new CommittableTypeInfo();
- SingleOutputStreamOperator<Committable> written =
- input.transform(WRITER_NAME, typeInfo,
createWriteOperator(initialCommitUser))
- .setParallelism(input.getParallelism());
-
+ boolean isStreaming =
+ conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
- conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING
- && checkpointConfig.isCheckpointingEnabled();
+ isStreaming && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertCheckpointConfiguration(env);
}
+ CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+ SingleOutputStreamOperator<Committable> written =
+ input.transform(
+ WRITER_NAME,
+ typeInfo,
+ createWriteOperator(
+
createWriteProvider(initialCommitUser), isStreaming))
+ .setParallelism(input.getParallelism());
+
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
@@ -135,12 +111,8 @@ public class StoreSink implements Serializable {
new CommitterOperator(
streamingCheckpointEnabled,
initialCommitUser,
- // If checkpoint is enabled for
streaming job, we have to
- // commit new files list even if
they're empty.
- // Otherwise we can't tell if the
commit is successful after
- // a restart.
- user -> createCommitter(user,
streamingCheckpointEnabled),
- ManifestCommittableSerializer::new))
+
createCommitterFactory(streamingCheckpointEnabled),
+ createCommittableStateManager()))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
@@ -158,4 +130,12 @@ public class StoreSink implements Serializable {
+
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
+ " to exactly-once");
}
+
+ protected abstract OneInputStreamOperator<RowData, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming);
+
+ protected abstract SerializableFunction<String, Committer>
createCommitterFactory(
+ boolean streamingCheckpointEnabled);
+
+ protected abstract CommittableStateManager createCommittableStateManager();
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 9107e19d..d93e1577 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
import java.util.Map;
-/** Sink builder to build a flink sink from input. */
+/** Builder for {@link FileStoreSink}. */
public class FlinkSinkBuilder {
private final FileStoreTable table;
@@ -80,7 +80,8 @@ public class FlinkSinkBuilder {
}
StreamExecutionEnvironment env = input.getExecutionEnvironment();
- StoreSink sink = new StoreSink(table, lockFactory, overwritePartition,
logSinkFunction);
+ FileStoreSink sink =
+ new FileStoreSink(table, lockFactory, overwritePartition,
logSinkFunction);
return sink.sinkFrom(new DataStream<>(env, partitioned));
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
new file mode 100644
index 00000000..f69443a0
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/HashRowStreamPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link StreamPartitioner} to partition {@link RowData} according to its
hash value. */
+public class HashRowStreamPartitioner extends StreamPartitioner<RowData> {
+
+ private final RowType rowType;
+
+ private transient RowDataSerializer serializer;
+
+ public HashRowStreamPartitioner(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void setup(int numberOfChannels) {
+ super.setup(numberOfChannels);
+ serializer = new RowDataSerializer(rowType);
+ }
+
+ @Override
+ public int selectChannel(SerializationDelegate<StreamRecord<RowData>>
record) {
+ int hash =
serializer.toBinaryRow(record.getInstance().getValue()).hashCode();
+ return Math.abs(hash) % numberOfChannels;
+ }
+
+ @Override
+ public StreamPartitioner<RowData> copy() {
+ return this;
+ }
+
+ @Override
+ public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+ return SubtaskStateMapper.FULL;
+ }
+
+ @Override
+ public boolean isPointwise() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "compactor-stream-partitioner";
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
new file mode 100644
index 00000000..2f45f998
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/NoopCommittableStateManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/**
+ * A {@link CommittableStateManager} which does nothing. If a commit attempt
fails, it will be lost
+ * after the job restarts.
+ *
+ * <p>Useful for committing optional snapshots. For example COMPACT snapshots
produced by a separate
+ * compact job.
+ */
+public class NoopCommittableStateManager implements CommittableStateManager {
+
+ @Override
+ public void initializeState(StateInitializationContext context, Committer
committer)
+ throws Exception {
+ // nothing to do
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context,
List<ManifestCommittable> committables)
+ throws Exception {
+ // nothing to do
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
new file mode 100644
index 00000000..524ad999
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/RestoreAndFailCommittableStateManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link CommittableStateManager} which stores uncommitted {@link
ManifestCommittable}s in state.
+ *
+ * <p>When the job restarts, these {@link ManifestCommittable}s will be
restored and committed, then
+ * an intended failure will occur, hoping that after the job restarts, all
writers can start writing
+ * based on the restored snapshot.
+ *
+ * <p>Useful for committing snapshots containing records. For example
snapshots produced by table
+ * store writers.
+ */
+public class RestoreAndFailCommittableStateManager implements
CommittableStateManager {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The committable's serializer. */
+ private final
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+ committableSerializer;
+
+ /** ManifestCommittable state of this job. Used to filter out previous
successful commits. */
+ private ListState<ManifestCommittable> streamingCommitterState;
+
+ public RestoreAndFailCommittableStateManager(
+
SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+ committableSerializer) {
+ this.committableSerializer = committableSerializer;
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context, Committer
committer)
+ throws Exception {
+ streamingCommitterState =
+ new SimpleVersionedListState<>(
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+
"streaming_committer_raw_states",
+
BytePrimitiveArraySerializer.INSTANCE)),
+ committableSerializer.get());
+ List<ManifestCommittable> restored = new ArrayList<>();
+ streamingCommitterState.get().forEach(restored::add);
+ streamingCommitterState.clear();
+ recover(restored, committer);
+ }
+
+ private void recover(List<ManifestCommittable> committables, Committer
committer)
+ throws Exception {
+ committables = committer.filterRecoveredCommittables(committables);
+ if (!committables.isEmpty()) {
+ committer.commit(committables);
+ throw new RuntimeException(
+ "This exception is intentionally thrown "
+ + "after committing the restored checkpoints. "
+ + "By restarting the job we hope that "
+ + "writers can start writing based on these new
commits.");
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context,
List<ManifestCommittable> committables)
+ throws Exception {
+ streamingCommitterState.update(committables);
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 2cdfad51..9ac2957a 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -55,7 +55,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator {
Preconditions.checkArgument(
!table.options().writeCompactionSkip(),
CoreOptions.WRITE_COMPACTION_SKIP.key()
- + " should not be true for StoreCompactOperator. This
is unexpected.");
+ + " should not be true for StoreCompactOperator.");
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.isStreaming = isStreaming;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index b4e8bb72..86d5dab0 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -33,8 +33,8 @@ import
org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.FileStoreSink;
import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
-import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.connector.source.ContinuousFileStoreSource;
import org.apache.flink.table.store.connector.source.FlinkSourceBuilder;
import org.apache.flink.table.store.connector.source.StaticFileStoreSource;
@@ -80,7 +80,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource}
and {@link
- * StoreSink}.
+ * FileStoreSink}.
*/
@RunWith(Parameterized.class)
public class FileStoreITCase extends AbstractTestBase {
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
index f176132d..075dd1d6 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTest.java
@@ -20,68 +20,44 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/** Tests for {@link CommitterOperator}. */
-public class CommitterOperatorTest {
+public class CommitterOperatorTest extends CommitterOperatorTestBase {
- private static final RowType ROW_TYPE =
- RowType.of(
- new LogicalType[] {
- DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType()
- },
- new String[] {"a", "b"});
-
- @TempDir public java.nio.file.Path tempDir;
- private Path tablePath;
private String initialCommitUser;
@BeforeEach
public void before() {
- tablePath = new Path(tempDir.toString());
+ super.before();
initialCommitUser = UUID.randomUUID().toString();
}
+ // ------------------------------------------------------------------------
+ // Recoverable operator tests
+ // ------------------------------------------------------------------------
+
@Test
public void testFailIntentionallyAfterRestore() throws Exception {
FileStoreTable table = createFileStoreTable();
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
- createTestHarness(table);
+ createRecoverableTestHarness(table);
testHarness.open();
TableWrite write = table.newWrite(initialCommitUser);
@@ -97,7 +73,7 @@ public class CommitterOperatorTest {
OperatorSubtaskState snapshot = testHarness.snapshot(0, timestamp++);
assertThat(table.snapshotManager().latestSnapshotId()).isNull();
- testHarness = createTestHarness(table);
+ testHarness = createRecoverableTestHarness(table);
try {
// commit snapshot from state, fail intentionally
testHarness.initializeState(snapshot);
@@ -114,7 +90,7 @@ public class CommitterOperatorTest {
assertResults(table, "1, 10", "2, 20");
// snapshot is successfully committed, no failure is needed
- testHarness = createTestHarness(table);
+ testHarness = createRecoverableTestHarness(table);
testHarness.initializeState(snapshot);
testHarness.open();
assertResults(table, "1, 10", "2, 20");
@@ -124,7 +100,7 @@ public class CommitterOperatorTest {
public void testCheckpointAbort() throws Exception {
FileStoreTable table = createFileStoreTable();
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
- createTestHarness(table);
+ createRecoverableTestHarness(table);
testHarness.open();
// files from multiple checkpoint
@@ -151,53 +127,94 @@ public class CommitterOperatorTest {
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(cpId);
}
- private void assertResults(FileStoreTable table, String... expected) {
- TableRead read = table.newRead();
- List<String> actual = new ArrayList<>();
- table.newScan()
- .plan()
- .splits
- .forEach(
- s -> {
- try {
- RecordReader<RowData> recordReader =
read.createReader(s);
- CloseableIterator<RowData> it =
- new
RecordReaderIterator<>(recordReader);
- while (it.hasNext()) {
- RowData row = it.next();
- actual.add(row.getInt(0) + ", " +
row.getLong(1));
- }
- it.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- Collections.sort(actual);
- assertThat(actual).isEqualTo(Arrays.asList(expected));
+ // ------------------------------------------------------------------------
+ // Lossy operator tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testSnapshotLostWhenFailed() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createLossyTestHarness(table);
+ testHarness.open();
+
+ long timestamp = 1;
+
+ // this checkpoint is notified, should be committed
+ TableWrite write = table.newWrite(initialCommitUser);
+ write.write(GenericRowData.of(1, 10L));
+ write.write(GenericRowData.of(2, 20L));
+ for (FileCommittable committable : write.prepareCommit(false, 1)) {
+ testHarness.processElement(
+ new Committable(1, Committable.Kind.FILE, committable),
timestamp++);
+ }
+ testHarness.snapshot(1, timestamp++);
+ testHarness.notifyOfCompletedCheckpoint(1);
+
+ // this checkpoint is not notified, should not be committed
+ write.write(GenericRowData.of(3, 30L));
+ write.write(GenericRowData.of(4, 40L));
+ for (FileCommittable committable : write.prepareCommit(false, 2)) {
+ testHarness.processElement(
+ new Committable(2, Committable.Kind.FILE, committable),
timestamp++);
+ }
+ OperatorSubtaskState snapshot = testHarness.snapshot(2, timestamp++);
+
+ // reopen test harness
+ write.close();
+ testHarness.close();
+
+ testHarness = createLossyTestHarness(table);
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // this checkpoint is notified, should be committed
+ write = table.newWrite(initialCommitUser);
+ write.write(GenericRowData.of(5, 50L));
+ write.write(GenericRowData.of(6, 60L));
+ for (FileCommittable committable : write.prepareCommit(false, 3)) {
+ testHarness.processElement(
+ new Committable(3, Committable.Kind.FILE, committable),
timestamp++);
+ }
+ testHarness.snapshot(3, timestamp++);
+ testHarness.notifyOfCompletedCheckpoint(3);
+
+ write.close();
+ testHarness.close();
+
+ assertResults(table, "1, 10", "2, 20", "5, 50", "6, 60");
}
- private FileStoreTable createFileStoreTable() throws Exception {
- Configuration conf = new Configuration();
- conf.set(CoreOptions.PATH, tablePath.toString());
- SchemaManager schemaManager = new SchemaManager(tablePath);
- schemaManager.commitNewVersion(
- new UpdateSchema(
- ROW_TYPE,
- Collections.emptyList(),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return FileStoreTableFactory.create(conf);
+ // ------------------------------------------------------------------------
+ // Test utils
+ // ------------------------------------------------------------------------
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
+ createRecoverableTestHarness(FileStoreTable table) throws
Exception {
+ CommitterOperator operator =
+ new CommitterOperator(
+ true,
+ initialCommitUser,
+ user -> new StoreCommitter(table.newCommit(user)),
+ new RestoreAndFailCommittableStateManager(
+ ManifestCommittableSerializer::new));
+ return createTestHarness(operator);
}
- private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
createLossyTestHarness(
FileStoreTable table) throws Exception {
CommitterOperator operator =
new CommitterOperator(
true,
initialCommitUser,
user -> new StoreCommitter(table.newCommit(user)),
- ManifestCommittableSerializer::new);
+ new NoopCommittableStateManager());
+ return createTestHarness(operator);
+ }
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
+ CommitterOperator operator) throws Exception {
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
OneInputStreamOperatorTestHarness<Committable, Committable> harness =
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
new file mode 100644
index 00000000..01ed9563
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommitterOperatorTestBase.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base test class for {@link AtMostOnceCommitterOperator} and {@link
ExactlyOnceCommitOperator}.
+ */
+public abstract class CommitterOperatorTestBase {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"a", "b"});
+
+ @TempDir public java.nio.file.Path tempDir;
+ protected Path tablePath;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(tempDir.toString());
+ }
+
+ protected void assertResults(FileStoreTable table, String... expected) {
+ TableRead read = table.newRead();
+ List<String> actual = new ArrayList<>();
+ table.newScan()
+ .plan()
+ .splits
+ .forEach(
+ s -> {
+ try {
+ RecordReader<RowData> recordReader =
read.createReader(s);
+ CloseableIterator<RowData> it =
+ new
RecordReaderIterator<>(recordReader);
+ while (it.hasNext()) {
+ RowData row = it.next();
+ actual.add(row.getInt(0) + ", " +
row.getLong(1));
+ }
+ it.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Collections.sort(actual);
+ assertThat(actual).isEqualTo(Arrays.asList(expected));
+ }
+
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(conf);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
new file mode 100644
index 00000000..774074cb
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CompactorSinkITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+/** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
+public class CompactorSinkITCase extends AbstractTestBase {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType()
+ },
+ new String[] {"dt", "hh", "k", "v"});
+
+ private Path tablePath;
+ private String commitUser;
+
+ @Before
+ public void before() throws IOException {
+ tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Test
+ public void testCompact() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ TableWrite write = table.newWrite(commitUser);
+ TableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(20221208, 15, 1, 100));
+ write.write(rowData(20221208, 16, 1, 100));
+ write.write(rowData(20221209, 15, 1, 100));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(20221208, 15, 2, 200));
+ write.write(rowData(20221208, 16, 2, 200));
+ write.write(rowData(20221209, 15, 2, 200));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assert.assertEquals(2, snapshot.id());
+ Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+ write.close();
+ commit.close();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ DataStreamSource<RowData> source =
+ env.fromElements(rowData(20221208, 15, 0), rowData(20221209,
15, 0));
+ new CompactorSinkBuilder(table).withInput(source).build();
+ env.execute();
+
+ snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assert.assertEquals(3, snapshot.id());
+ Assert.assertEquals(Snapshot.CommitKind.COMPACT,
snapshot.commitKind());
+
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ Assert.assertEquals(3, plan.splits().size());
+ for (DataSplit split : plan.splits) {
+ if (split.partition().getInt(1) == 15) {
+ // compacted
+ Assert.assertEquals(1, split.files().size());
+ } else {
+ // not compacted
+ Assert.assertEquals(2, split.files().size());
+ }
+ }
+ }
+
+ private GenericRowData rowData(Object... values) {
+ return GenericRowData.of(values);
+ }
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ TableSchema tableSchema =
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ Collections.emptyMap(),
+ ""));
+ return FileStoreTableFactory.create(tablePath, tableSchema);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
index 1aa4546f..bb94a6ee 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -53,7 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-/** IT cases for {@link StoreSink} when writing file store and with
savepoints. */
+/** IT cases for {@link FileStoreSink} when writing file store and with
savepoints. */
public class SinkSavepointITCase extends AbstractTestBase {
private String path;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
index ed68276c..f7ebba03 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
@@ -52,7 +52,7 @@ import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link CompactorSourceBuilder}. */
+/** IT cases for {@link CompactorSourceBuilder}. */
public class CompactorSourceITCase extends AbstractTestBase {
private static final RowType ROW_TYPE =
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
index 8f6a22fb..b94c3c32 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
@@ -53,16 +53,9 @@ public class BucketsTable implements DataTable {
private static final long serialVersionUID = 1L;
private final FileStoreTable wrapped;
- private final RowType rowType;
public BucketsTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
-
- RowType partitionType = wrapped.schema().logicalPartitionType();
- List<RowType.RowField> fields = new
ArrayList<>(partitionType.getFields());
- // same with ManifestEntry.schema
- fields.add(new RowType.RowField("_BUCKET", new IntType()));
- this.rowType = new RowType(fields);
}
@Override
@@ -82,7 +75,15 @@ public class BucketsTable implements DataTable {
@Override
public RowType rowType() {
- return rowType;
+ RowType partitionType = wrapped.schema().logicalPartitionType();
+ return rowType(partitionType);
+ }
+
+ public static RowType rowType(RowType partitionType) {
+ List<RowType.RowField> fields = new
ArrayList<>(partitionType.getFields());
+ // same with ManifestEntry.schema
+ fields.add(new RowType.RowField("_BUCKET", new IntType()));
+ return new RowType(fields);
}
@Override