This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 38785e0 [FLINK-26530] Introduce TableStore API and refactor ITCases
38785e0 is described below
commit 38785e01bfdc58b05e4d6596d43757386bf5a108
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 11 11:56:41 2022 +0800
[FLINK-26530] Introduce TableStore API and refactor ITCases
This closes #36
---
.../flink/table/store/connector/TableStore.java | 279 +++++++++++++++++++++
.../connector/sink/BucketStreamPartitioner.java | 80 ++++++
.../store/connector/source/FileStoreSource.java | 38 +--
.../source/FileStoreSourceSplitGenerator.java | 4 -
.../connector/source/LogHybridSourceFactory.java | 52 ++++
.../connector/source/PendingSplitsCheckpoint.java | 22 +-
.../source/PendingSplitsCheckpointSerializer.java | 6 +-
.../source/StaticFileStoreSplitEnumerator.java | 20 +-
.../table/store/connector/FileStoreITCase.java | 202 +++++++--------
.../store/connector/sink/LogStoreSinkITCase.java | 134 +++++-----
.../PendingSplitsCheckpointSerializerTest.java | 12 +-
.../source/StaticFileStoreSplitEnumeratorTest.java | 3 +-
.../table/store/file/operation/FileStoreScan.java | 3 +
.../store/file/operation/FileStoreScanImpl.java | 8 +-
.../table/store/sink/SinkRecordConverter.java | 14 +-
15 files changed, 636 insertions(+), 241 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
new file mode 100644
index 0000000..609cee9
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
+import org.apache.flink.table.store.connector.sink.StoreSink;
+import
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
+import
org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import
org.apache.flink.table.store.file.mergetree.compact.ValueCountAccumulator;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.store.utils.ProjectionUtils;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+
+/** A table store api to create source and sink. */
+@Experimental
+public class TableStore {
+
+ private final Configuration options;
+
+ /** commit user, default uuid. */
+ private String user = UUID.randomUUID().toString();
+
+ /** partition keys, default no partition. */
+ private int[] partitions = new int[0];
+
+ /** primary keys, default no key. */
+ private int[] primaryKeys = new int[0];
+
+ private RowType type;
+
+ private ObjectIdentifier tableIdentifier;
+
+ public TableStore(Configuration options) {
+ this.options = options;
+ }
+
+ public TableStore withUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public TableStore withSchema(RowType type) {
+ this.type = type;
+ return this;
+ }
+
+ public TableStore withPartitions(int[] partitions) {
+ this.partitions = partitions;
+ return this;
+ }
+
+ public TableStore withPrimaryKeys(int[] primaryKeys) {
+ this.primaryKeys = primaryKeys;
+ return this;
+ }
+
+ public TableStore withTableIdentifier(ObjectIdentifier tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ return this;
+ }
+
+ public SourceBuilder sourceBuilder() {
+ return new SourceBuilder();
+ }
+
+ public SinkBuilder sinkBuilder() {
+ return new SinkBuilder();
+ }
+
+ private FileStore buildFileStore() {
+ RowType partitionType = ProjectionUtils.project(type, partitions);
+ RowType keyType;
+ RowType valueType;
+ Accumulator accumulator;
+ if (primaryKeys.length == 0) {
+ keyType = type;
+ valueType = RowType.of(new BigIntType(false));
+ accumulator = new ValueCountAccumulator();
+ } else {
+ List<RowType.RowField> fields = ProjectionUtils.project(type,
primaryKeys).getFields();
+ // add _KEY_ prefix to avoid conflict with value
+ keyType =
+ new RowType(
+ fields.stream()
+ .map(
+ f ->
+ new RowType.RowField(
+ "_KEY_" +
f.getName(),
+ f.getType(),
+
f.getDescription().orElse(null)))
+ .collect(Collectors.toList()));
+ valueType = type;
+ accumulator = new DeduplicateAccumulator();
+ }
+ return new FileStoreImpl(options, user, partitionType, keyType,
valueType, accumulator);
+ }
+
+ /** Source builder to build a flink {@link Source}. */
+ public class SourceBuilder {
+
+ private boolean isContinuous = false;
+
+ private boolean isHybrid = true;
+
+ @Nullable private int[][] projectedFields;
+
+ @Nullable private Predicate predicate;
+
+ @Nullable private LogSourceProvider logSourceProvider;
+
+ public SourceBuilder withProjection(int[][] projectedFields) {
+ this.projectedFields = projectedFields;
+ return this;
+ }
+
+ public SourceBuilder withPredicate(Predicate predicate) {
+ this.predicate = predicate;
+ return this;
+ }
+
+ public SourceBuilder withContinuousMode(boolean isContinuous) {
+ this.isContinuous = isContinuous;
+ return this;
+ }
+
+ public SourceBuilder withHybridMode(boolean isHybrid) {
+ this.isHybrid = isHybrid;
+ return this;
+ }
+
+ public SourceBuilder withLogSourceProvider(LogSourceProvider
logSourceProvider) {
+ this.logSourceProvider = logSourceProvider;
+ return this;
+ }
+
+ private FileStoreSource buildFileStoreSource() {
+ FileStore fileStore = buildFileStore();
+ return new FileStoreSource(
+ fileStore, primaryKeys.length == 0, projectedFields,
predicate);
+ }
+
+ public Source<RowData, ?, ?> build() {
+ if (isContinuous) {
+ if (logSourceProvider == null) {
+ throw new UnsupportedOperationException(
+ "File store continuous mode is not supported
yet.");
+ }
+
+ // TODO project log source
+
+ if (isHybrid) {
+ return HybridSource.<RowData,
StaticFileStoreSplitEnumerator>builder(
+ buildFileStoreSource())
+ .addSource(
+ new
LogHybridSourceFactory(logSourceProvider),
+ Boundedness.CONTINUOUS_UNBOUNDED)
+ .build();
+ } else {
+ return logSourceProvider.createSource(null);
+ }
+ } else {
+ return buildFileStoreSource();
+ }
+ }
+
+ public DataStreamSource<RowData> build(StreamExecutionEnvironment env)
{
+ return env.fromSource(
+ build(),
+ WatermarkStrategy.noWatermarks(),
+ tableIdentifier.asSummaryString(),
+ InternalTypeInfo.of(type));
+ }
+ }
+
+ /** Sink builder to build a flink sink from input. */
+ public class SinkBuilder {
+
+ private DataStream<RowData> input;
+
+ @Nullable private CatalogLock.Factory lockFactory;
+
+ @Nullable private Map<String, String> overwritePartition;
+
+ @Nullable private LogSinkProvider logSinkProvider;
+
+ public SinkBuilder withInput(DataStream<RowData> input) {
+ this.input = input;
+ return this;
+ }
+
+ public SinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
+ this.lockFactory = lockFactory;
+ return this;
+ }
+
+ public SinkBuilder withOverwritePartition(Map<String, String>
overwritePartition) {
+ this.overwritePartition = overwritePartition;
+ return this;
+ }
+
+ public SinkBuilder withLogSinkProvider(LogSinkProvider
logSinkProvider) {
+ this.logSinkProvider = logSinkProvider;
+ return this;
+ }
+
+ public DataStreamSink<?> build() {
+ FileStore fileStore = buildFileStore();
+ int numBucket = options.get(BUCKET);
+
+ BucketStreamPartitioner partitioner =
+ new BucketStreamPartitioner(numBucket, type, partitions,
primaryKeys);
+ DataStream<RowData> partitioned =
+ new DataStream<>(
+ input.getExecutionEnvironment(),
+ new
PartitionTransformation<>(input.getTransformation(), partitioner));
+
+ StoreSink<?, ?> sink =
+ new StoreSink<>(
+ tableIdentifier,
+ fileStore,
+ partitions,
+ primaryKeys,
+ numBucket,
+ lockFactory,
+ overwritePartition,
+ logSinkProvider);
+ return GlobalCommittingSinkTranslator.translate(partitioned, sink);
+ }
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
new file mode 100644
index 0000000..a20253b
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.sink.SinkRecordConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+/** A {@link StreamPartitioner} to partition records by bucket. */
+public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
+
+ private final int numBucket;
+ private final RowType inputType;
+ private final int[] partitions;
+ private final int[] primaryKeys;
+
+ private transient SinkRecordConverter recordConverter;
+
+ public BucketStreamPartitioner(
+ int numBucket, RowType inputType, int[] partitions, int[]
primaryKeys) {
+ this.numBucket = numBucket;
+ this.inputType = inputType;
+ this.partitions = partitions;
+ this.primaryKeys = primaryKeys;
+ }
+
+ @Override
+ public void setup(int numberOfChannels) {
+ super.setup(numberOfChannels);
+ this.recordConverter =
+ new SinkRecordConverter(numBucket, inputType, partitions,
primaryKeys);
+ }
+
+ @Override
+ public int selectChannel(SerializationDelegate<StreamRecord<RowData>>
record) {
+ RowData row = record.getInstance().getValue();
+ int bucket = recordConverter.bucket(row, recordConverter.key(row));
+ return bucket % numberOfChannels;
+ }
+
+ @Override
+ public StreamPartitioner<RowData> copy() {
+ return this;
+ }
+
+ @Override
+ public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+ return SubtaskStateMapper.FULL;
+ }
+
+ @Override
+ public boolean isPointwise() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "bucket-assigner";
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index ce36204..b595df5 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -26,13 +26,14 @@ import
org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import javax.annotation.Nullable;
-import static org.apache.flink.util.Preconditions.checkArgument;
+import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
/** {@link Source} of file store. */
public class FileStoreSource
@@ -46,21 +47,17 @@ public class FileStoreSource
@Nullable private final int[][] projectedFields;
- @Nullable private final Predicate partitionPredicate;
-
- @Nullable private final Predicate fieldsPredicate;
+ @Nullable private final Predicate predicate;
public FileStoreSource(
FileStore fileStore,
boolean valueCountMode,
@Nullable int[][] projectedFields,
- @Nullable Predicate partitionPredicate,
- @Nullable Predicate fieldsPredicate) {
+ @Nullable Predicate predicate) {
this.fileStore = fileStore;
this.valueCountMode = valueCountMode;
this.projectedFields = projectedFields;
- this.partitionPredicate = partitionPredicate;
- this.fieldsPredicate = fieldsPredicate;
+ this.predicate = predicate;
}
@Override
@@ -74,6 +71,7 @@ public class FileStoreSource
FileStoreRead read = fileStore.newRead();
if (projectedFields != null) {
if (valueCountMode) {
+ // TODO don't project keys, and add key projection to split
reader
read.withKeyProjection(projectedFields);
} else {
read.withValueProjection(projectedFields);
@@ -86,15 +84,14 @@ public class FileStoreSource
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
createEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
FileStoreScan scan = fileStore.newScan();
- if (partitionPredicate != null) {
- scan.withPartitionFilter(partitionPredicate);
- }
- if (fieldsPredicate != null) {
- if (valueCountMode) {
- scan.withKeyFilter(fieldsPredicate);
- } else {
- scan.withValueFilter(fieldsPredicate);
- }
+ if (predicate != null) {
+ // TODO split predicate into partitionPredicate and fieldsPredicate
+ // scan.withPartitionFilter(partitionPredicate);
+ // if (keyAsRecord) {
+ // scan.withKeyFilter(fieldsPredicate);
+ // } else {
+ // scan.withValueFilter(fieldsPredicate);
+ // }
}
return new StaticFileStoreSplitEnumerator(context, scan);
}
@@ -103,8 +100,11 @@ public class FileStoreSource
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint>
restoreEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
- checkArgument(checkpoint.nextSnapshotId() == -1);
- return new StaticFileStoreSplitEnumerator(context,
checkpoint.splits());
+ Snapshot snapshot = null;
+ if (checkpoint.currentSnapshotId() != INVALID_SNAPSHOT) {
+ snapshot =
fileStore.newScan().snapshot(checkpoint.currentSnapshotId());
+ }
+ return new StaticFileStoreSplitEnumerator(context, snapshot,
checkpoint.splits());
}
@Override
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
index 217119d..f9b64ab 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGenerator.java
@@ -35,10 +35,6 @@ public class FileStoreSourceSplitGenerator {
*/
private final char[] currentId = "0000000000".toCharArray();
- public List<FileStoreSourceSplit> createSplits(FileStoreScan scan) {
- return createSplits(scan.plan());
- }
-
public List<FileStoreSourceSplit> createSplits(FileStoreScan.Plan plan) {
return plan.groupByPartFiles().entrySet().stream()
.flatMap(
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
new file mode 100644
index 0000000..0f892fc
--- /dev/null
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/LogHybridSourceFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import
org.apache.flink.connector.base.source.hybrid.HybridSource.SourceFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.log.LogSourceProvider;
+
+import java.util.Map;
+
+/** Log {@link SourceFactory} from {@link StaticFileStoreSplitEnumerator}. */
+public class LogHybridSourceFactory
+ implements SourceFactory<RowData, Source<RowData, ?, ?>,
StaticFileStoreSplitEnumerator> {
+
+ private final LogSourceProvider provider;
+
+ public LogHybridSourceFactory(LogSourceProvider provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public Source<RowData, ?, ?> create(
+ HybridSource.SourceSwitchContext<StaticFileStoreSplitEnumerator>
context) {
+ StaticFileStoreSplitEnumerator enumerator =
context.getPreviousEnumerator();
+ Snapshot snapshot = enumerator.snapshot();
+ Map<Integer, Long> logOffsets = null;
+ if (snapshot != null) {
+ // TODO
+ // logOffsets = snapshot.getLogOffsets();
+ }
+ return provider.createSource(logOffsets);
+ }
+}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
index 666cdf8..10c2b23 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpoint.java
@@ -26,30 +26,24 @@ import java.util.Collection;
*/
public class PendingSplitsCheckpoint {
+ public static final long INVALID_SNAPSHOT = -1L;
+
/** The splits in the checkpoint. */
private final Collection<FileStoreSourceSplit> splits;
- private final long nextSnapshotId;
+ private final long currentSnapshotId;
- private PendingSplitsCheckpoint(Collection<FileStoreSourceSplit> splits,
long nextSnapshotId) {
+ public PendingSplitsCheckpoint(
+ Collection<FileStoreSourceSplit> splits, long currentSnapshotId) {
this.splits = splits;
- this.nextSnapshotId = nextSnapshotId;
+ this.currentSnapshotId = currentSnapshotId;
}
public Collection<FileStoreSourceSplit> splits() {
return splits;
}
- public long nextSnapshotId() {
- return nextSnapshotId;
- }
-
- public static PendingSplitsCheckpoint
fromStatic(Collection<FileStoreSourceSplit> splits) {
- return new PendingSplitsCheckpoint(splits, -1);
- }
-
- public static PendingSplitsCheckpoint fromContinuous(
- Collection<FileStoreSourceSplit> splits, long nextSnapshotId) {
- return new PendingSplitsCheckpoint(splits, nextSnapshotId);
+ public long currentSnapshotId() {
+ return currentSnapshotId;
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
index b38748e..b3df0ac 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java
@@ -52,7 +52,7 @@ public class PendingSplitsCheckpointSerializer
view.writeInt(bytes.length);
view.write(bytes);
}
- view.writeLong(pendingSplitsCheckpoint.nextSnapshotId());
+ view.writeLong(pendingSplitsCheckpoint.currentSnapshotId());
return out.toByteArray();
}
@@ -67,7 +67,7 @@ public class PendingSplitsCheckpointSerializer
view.readFully(bytes);
splits.add(splitSerializer.deserialize(version, bytes));
}
- long nextSnapshotId = view.readLong();
- return PendingSplitsCheckpoint.fromContinuous(splits, nextSnapshotId);
+ long currentSnapshotId = view.readLong();
+ return new PendingSplitsCheckpoint(splits, currentSnapshotId);
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 6a884b2..4fc828b 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import javax.annotation.Nullable;
@@ -30,25 +31,34 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import static
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+
/** A SplitEnumerator implementation for bounded / batch {@link
FileStoreSource} input. */
public class StaticFileStoreSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit,
PendingSplitsCheckpoint> {
private final SplitEnumeratorContext<FileStoreSourceSplit> context;
+ @Nullable private final Snapshot snapshot;
+
private final Queue<FileStoreSourceSplit> splits;
public StaticFileStoreSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
+ Snapshot snapshot,
Collection<FileStoreSourceSplit> splits) {
this.context = context;
+ this.snapshot = snapshot;
this.splits = new LinkedList<>(splits);
}
public StaticFileStoreSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
FileStoreScan scan) {
this.context = context;
- this.splits = new LinkedList<>(new
FileStoreSourceSplitGenerator().createSplits(scan));
+ FileStoreScan.Plan plan = scan.plan();
+ Long snapshotId = plan.snapshotId();
+ this.snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+ this.splits = new LinkedList<>(new
FileStoreSourceSplitGenerator().createSplits(plan));
}
@Override
@@ -83,11 +93,17 @@ public class StaticFileStoreSplitEnumerator
@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
- return PendingSplitsCheckpoint.fromStatic(new ArrayList<>(splits));
+ return new PendingSplitsCheckpoint(
+ new ArrayList<>(splits), snapshot == null ? INVALID_SNAPSHOT :
snapshot.id());
}
@Override
public void close() {
// no resources to close
}
+
+ @Nullable
+ public Snapshot snapshot() {
+ return snapshot;
+ }
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 221e85c..7c4a884 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -19,11 +19,10 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -32,13 +31,8 @@ import
org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.sink.StoreSink;
-import
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
import org.apache.flink.table.store.connector.source.FileStoreSource;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.FileStoreImpl;
-import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -49,18 +43,19 @@ import org.apache.flink.util.CloseableIterator;
import org.junit.Assume;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import javax.annotation.Nullable;
-
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
@@ -71,13 +66,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@RunWith(Parameterized.class)
public class FileStoreITCase extends AbstractTestBase {
- private static final RowType PARTITION_TYPE =
- new RowType(Collections.singletonList(new RowType.RowField("p",
new VarCharType())));
-
- private static final RowType KEY_TYPE =
- new RowType(Collections.singletonList(new RowType.RowField("k",
new IntType())));
-
- public static final RowType VALUE_TYPE =
+ public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new RowType.RowField("v", new IntType()),
@@ -89,14 +78,15 @@ public class FileStoreITCase extends AbstractTestBase {
public static final DataStructureConverter<RowData, Row> CONVERTER =
(DataStructureConverter)
DataStructureConverters.getConverter(
- TypeConversions.fromLogicalToDataType(VALUE_TYPE));
+ TypeConversions.fromLogicalToDataType(TABLE_TYPE));
private static final int NUM_BUCKET = 3;
- private static final List<RowData> SOURCE_DATA =
+ public static final List<RowData> SOURCE_DATA =
Arrays.asList(
wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
@@ -104,8 +94,14 @@ public class FileStoreITCase extends AbstractTestBase {
private final boolean isBatch;
- public FileStoreITCase(boolean isBatch) {
+ private final StreamExecutionEnvironment env;
+
+ private final TableStore store;
+
+ public FileStoreITCase(boolean isBatch) throws IOException {
this.isBatch = isBatch;
+ this.env = isBatch ? buildBatchEnv() : buildStreamEnv();
+ this.store = buildTableStore(isBatch, TEMPORARY_FOLDER);
}
@Parameterized.Parameters(name = "isBatch-{0}")
@@ -114,83 +110,103 @@ public class FileStoreITCase extends AbstractTestBase {
}
private static SerializableRowData wrap(RowData row) {
- return new SerializableRowData(row,
InternalSerializers.create(VALUE_TYPE));
+ return new SerializableRowData(row,
InternalSerializers.create(TABLE_TYPE));
}
@Test
public void testPartitioned() throws Exception {
- innerTest(true);
+ store.withPartitions(new int[] {1});
+
+ // write
+ store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+ env.execute();
+
+ // read
+ List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+
+ // assert
+ Row[] expected =
+ new Row[] {
+ Row.of(5, "p2", 1), Row.of(3, "p2", 5), Row.of(5, "p1",
1), Row.of(0, "p1", 2)
+ };
+ assertThat(results).containsExactlyInAnyOrder(expected);
}
@Test
public void testNonPartitioned() throws Exception {
- innerTest(false);
+ store.withPartitions(new int[0]);
+
+ // write
+ store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+ env.execute();
+
+ // read
+ List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+
+ // assert
+ Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)};
+ assertThat(results).containsExactlyInAnyOrder(expected);
}
@Test
public void testOverwrite() throws Exception {
Assume.assumeTrue(isBatch);
+ store.withPartitions(new int[] {1});
- StreamExecutionEnvironment env = buildBatchEnv();
- FileStore fileStore =
- buildFileStore(buildConfiguration(isBatch,
TEMPORARY_FOLDER.newFolder()), true);
-
- // sink
- DataStreamSource<RowData> finiteSource = buildTestSource(env, true);
- write(finiteSource, fileStore, true);
+ // write
+ store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+ env.execute();
// overwrite p2
- finiteSource =
+ DataStreamSource<RowData> partialData =
env.fromCollection(
Collections.singletonList(
wrap(GenericRowData.of(9,
StringData.fromString("p2"), 5))),
- InternalTypeInfo.of(VALUE_TYPE));
+ InternalTypeInfo.of(TABLE_TYPE));
Map<String, String> overwrite = new HashMap<>();
overwrite.put("p", "p2");
- write(finiteSource, fileStore, true, overwrite);
+
store.sinkBuilder().withInput(partialData).withOverwritePartition(overwrite).build();
+ env.execute();
// read
- List<Row> results = read(env, fileStore);
+ List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
// overwrite all
- finiteSource =
+ partialData =
env.fromCollection(
Collections.singletonList(
wrap(GenericRowData.of(19,
StringData.fromString("p2"), 6))),
- InternalTypeInfo.of(VALUE_TYPE));
- write(finiteSource, fileStore, true, new HashMap<>());
+ InternalTypeInfo.of(TABLE_TYPE));
+ store.sinkBuilder().withInput(partialData).withOverwritePartition(new
HashMap<>()).build();
+ env.execute();
// read
- results = read(env, fileStore);
+ results = executeAndCollect(store.sourceBuilder().build(env));
expected = new Row[] {Row.of(19, "p2", 6)};
assertThat(results).containsExactlyInAnyOrder(expected);
}
- private void innerTest(boolean partitioned) throws Exception {
- StreamExecutionEnvironment env = isBatch ? buildBatchEnv() :
buildStreamEnv();
- FileStore fileStore =
- buildFileStore(
- buildConfiguration(isBatch,
TEMPORARY_FOLDER.newFolder()), partitioned);
-
- // sink
- DataStreamSource<RowData> finiteSource = buildTestSource(env, isBatch);
- write(finiteSource, fileStore, partitioned);
+ @Test
+ public void testPartitionedNonKey() throws Exception {
+ store.withPartitions(new int[] {1}).withPrimaryKeys(new int[0]);
- // source
- List<Row> results = read(env, fileStore);
+ // write
+ store.sinkBuilder().withInput(buildTestSource(env, isBatch)).build();
+ env.execute();
- Row[] expected =
- partitioned
- ? new Row[] {
- Row.of(5, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p1", 1),
- Row.of(0, "p1", 2)
- }
- : new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)};
+ // read
+ List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+
+ // assert
+ // in streaming mode, expect origin data X 2 (FiniteTestSource)
+ Stream<RowData> expectedStream =
+ isBatch
+ ? SOURCE_DATA.stream()
+ : Stream.concat(SOURCE_DATA.stream(),
SOURCE_DATA.stream());
+ Row[] expected =
expectedStream.map(CONVERTER::toExternal).toArray(Row[]::new);
assertThat(results).containsExactlyInAnyOrder(expected);
}
@@ -209,6 +225,14 @@ public class FileStoreITCase extends AbstractTestBase {
return env;
}
+ public static TableStore buildTableStore(boolean noFail, TemporaryFolder
temporaryFolder)
+ throws IOException {
+ return new TableStore(buildConfiguration(noFail,
temporaryFolder.newFolder()))
+ .withSchema(TABLE_TYPE)
+ .withPrimaryKeys(new int[] {2})
+ .withTableIdentifier(ObjectIdentifier.of("catalog", "db",
"t"));
+ }
+
public static Configuration buildConfiguration(boolean noFail, File
folder) {
Configuration options = new Configuration();
options.set(BUCKET, NUM_BUCKET);
@@ -222,72 +246,16 @@ public class FileStoreITCase extends AbstractTestBase {
return options;
}
- public static FileStore buildFileStore(Configuration options, boolean
partitioned) {
- return new FileStoreImpl(
- options,
- "user",
- partitioned ? PARTITION_TYPE : RowType.of(),
- KEY_TYPE,
- VALUE_TYPE,
- new DeduplicateAccumulator());
- }
-
public static DataStreamSource<RowData> buildTestSource(
StreamExecutionEnvironment env, boolean isBatch) {
return isBatch
- ? env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(VALUE_TYPE))
+ ? env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(TABLE_TYPE))
: env.addSource(
- new FiniteTestSource<>(SOURCE_DATA),
InternalTypeInfo.of(VALUE_TYPE));
- }
-
- public static void write(DataStream<RowData> input, FileStore fileStore,
boolean partitioned)
- throws Exception {
- write(input, fileStore, partitioned, null);
- }
-
- public static void write(
- DataStream<RowData> input,
- FileStore fileStore,
- boolean partitioned,
- @Nullable Map<String, String> overwritePartition)
- throws Exception {
- write(input, fileStore, partitioned, overwritePartition, null);
- }
-
- public static void write(
- DataStream<RowData> input,
- FileStore fileStore,
- boolean partitioned,
- @Nullable Map<String, String> overwritePartition,
- @Nullable LogSinkProvider logSinkProvider)
- throws Exception {
- int[] partitions = partitioned ? new int[] {1} : new int[0];
- int[] keys = new int[] {2};
- StoreSink<?, ?> sink =
- new StoreSink<>(
- null,
- fileStore,
- partitions,
- keys,
- NUM_BUCKET,
- null,
- overwritePartition,
- logSinkProvider);
- input = input.keyBy(row -> row.getInt(2)); // key by
- GlobalCommittingSinkTranslator.translate(input, sink);
- input.getExecutionEnvironment().execute();
+ new FiniteTestSource<>(SOURCE_DATA),
InternalTypeInfo.of(TABLE_TYPE));
}
- public static List<Row> read(StreamExecutionEnvironment env, FileStore
fileStore)
- throws Exception {
- FileStoreSource source = new FileStoreSource(fileStore, false, null,
null, null);
- CloseableIterator<RowData> iterator =
- env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- "source",
- InternalTypeInfo.of(VALUE_TYPE))
- .executeAndCollect();
+ public static List<Row> executeAndCollect(DataStreamSource<RowData>
source) throws Exception {
+ CloseableIterator<RowData> iterator = source.executeAndCollect();
List<Row> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(CONVERTER.toExternal(iterator.next()));
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 023b4b6..c4e8eb4 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -18,13 +18,10 @@
package org.apache.flink.table.store.connector.sink;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.store.connector.FileStoreITCase;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.connector.TableStore;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
@@ -37,12 +34,16 @@ import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static org.apache.flink.table.store.connector.FileStoreITCase.CONVERTER;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.SOURCE_DATA;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.TABLE_TYPE;
import static
org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
-import static
org.apache.flink.table.store.connector.FileStoreITCase.buildConfiguration;
-import static
org.apache.flink.table.store.connector.FileStoreITCase.buildFileStore;
import static
org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.buildTableStore;
import static
org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
+import static
org.apache.flink.table.store.connector.FileStoreITCase.executeAndCollect;
import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
import static
org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
@@ -53,33 +54,48 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
@Test
public void testStreamingPartitioned() throws Exception {
- innerTest("testStreamingPartitioned", false, true, true);
+ innerTest("testStreamingPartitioned", false, true, true, true);
}
@Test
public void testStreamingNonPartitioned() throws Exception {
- innerTest("testStreamingNonPartitioned", false, false, true);
+ innerTest("testStreamingNonPartitioned", false, false, true, true);
}
@Test
public void testBatchPartitioned() throws Exception {
- innerTest("testBatchPartitioned", true, true, true);
+ innerTest("testBatchPartitioned", true, true, true, true);
}
@Test
public void testStreamingEventual() throws Exception {
- innerTest("testStreamingEventual", false, true, false);
+ innerTest("testStreamingEventual", false, true, false, true);
}
- private void innerTest(String name, boolean isBatch, boolean partitioned,
boolean transaction)
+ @Test
+ public void testStreamingPartitionedNonKey() throws Exception {
+ innerTest("testStreamingPartitionedNonKey", false, true, true, false);
+ }
+
+ @Test
+ public void testBatchPartitionedNonKey() throws Exception {
+ innerTest("testBatchPartitionedNonKey", true, true, true, false);
+ }
+
+ private void innerTest(
+ String name, boolean isBatch, boolean partitioned, boolean
transaction, boolean keyed)
throws Exception {
StreamExecutionEnvironment env = isBatch ? buildBatchEnv() :
buildStreamEnv();
// in eventual mode, failure will result in duplicate data
- FileStore fileStore =
- buildFileStore(
- buildConfiguration(isBatch || !transaction,
TEMPORARY_FOLDER.newFolder()),
- partitioned);
+ TableStore store = buildTableStore(isBatch || !transaction,
TEMPORARY_FOLDER);
+ if (partitioned) {
+ store.withPartitions(new int[] {1});
+ }
+
+ if (!keyed) {
+ store.withPrimaryKeys(new int[0]);
+ }
// prepare log
DynamicTableFactory.Context context =
@@ -90,8 +106,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
transaction
? LogOptions.LogConsistency.TRANSACTIONAL
: LogOptions.LogConsistency.EVENTUAL,
- FileStoreITCase.VALUE_TYPE,
- new int[] {2});
+ TABLE_TYPE,
+ keyed ? new int[] {2} : new int[0]);
KafkaLogStoreFactory factory = discoverKafkaLogFactory();
KafkaLogSinkProvider sinkProvider =
factory.createSinkProvider(context, SINK_CONTEXT);
@@ -102,64 +118,44 @@ public class LogStoreSinkITCase extends
KafkaTableTestBase {
try {
// write
- DataStreamSource<RowData> finiteSource = buildTestSource(env,
isBatch);
- FileStoreITCase.write(finiteSource, fileStore, partitioned, null,
sinkProvider);
+ store.sinkBuilder()
+ .withInput(buildTestSource(env, isBatch))
+ .withLogSinkProvider(sinkProvider)
+ .build();
+ env.execute();
// read
- List<Row> results = FileStoreITCase.read(env, fileStore);
-
- Row[] expected =
- partitioned
- ? new Row[] {
- Row.of(5, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p1", 1),
- Row.of(0, "p1", 2)
- }
- : new Row[] {
- Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)
- };
- assertThat(results).containsExactlyInAnyOrder(expected);
+ List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
- results =
- buildStreamEnv()
- .fromSource(
- sourceProvider.createSource(null),
- WatermarkStrategy.noWatermarks(),
- "source")
- .executeAndCollect(isBatch ? 6 : 12).stream()
- .map(FileStoreITCase.CONVERTER::toExternal)
- .collect(Collectors.toList());
-
- if (isBatch) {
+ Row[] expected;
+ if (keyed) {
expected =
- new Row[] {
- Row.of(0, "p1", 1),
- Row.of(0, "p1", 2),
- Row.of(5, "p1", 1),
- Row.of(6, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p2", 1)
- };
+ partitioned
+ ? new Row[] {
+ Row.of(5, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p1", 1),
+ Row.of(0, "p1", 2)
+ }
+ : new Row[] {
+ Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)
+ };
} else {
- // read log
- // expect origin data X 2 (FiniteTestSource)
- expected =
- new Row[] {
- Row.of(0, "p1", 1),
- Row.of(0, "p1", 2),
- Row.of(5, "p1", 1),
- Row.of(6, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p2", 1),
- Row.of(0, "p1", 1),
- Row.of(0, "p1", 2),
- Row.of(5, "p1", 1),
- Row.of(6, "p2", 1),
- Row.of(3, "p2", 5),
- Row.of(5, "p2", 1)
- };
+ Stream<RowData> expectedStream =
+ isBatch
+ ? SOURCE_DATA.stream()
+ : Stream.concat(SOURCE_DATA.stream(),
SOURCE_DATA.stream());
+ expected =
expectedStream.map(CONVERTER::toExternal).toArray(Row[]::new);
}
+
+ assertThat(results).containsExactlyInAnyOrder(expected);
+
+ results =
+ store.sourceBuilder().withContinuousMode(true)
+
.withLogSourceProvider(sourceProvider).build(buildStreamEnv())
+ .executeAndCollect(expected.length).stream()
+ .map(CONVERTER::toExternal)
+ .collect(Collectors.toList());
assertThat(results).containsExactlyInAnyOrder(expected);
} finally {
factory.onDropTable(context, true);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index 879d908..5112244 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -38,7 +38,7 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void serializeEmptyCheckpoint() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- PendingSplitsCheckpoint.fromStatic(Collections.emptyList());
+ new PendingSplitsCheckpoint(Collections.emptyList(), 5);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -48,8 +48,8 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void serializeSomeSplits() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- PendingSplitsCheckpoint.fromStatic(
- Arrays.asList(testSplit1(), testSplit2(),
testSplit3()));
+ new PendingSplitsCheckpoint(
+ Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 3);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -59,7 +59,7 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void serializeSplitsAndContinuous() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- PendingSplitsCheckpoint.fromContinuous(
+ new PendingSplitsCheckpoint(
Arrays.asList(testSplit1(), testSplit2(),
testSplit3()), 20);
final PendingSplitsCheckpoint deSerialized =
serializeAndDeserialize(checkpoint);
@@ -70,7 +70,7 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void repeatedSerialization() throws Exception {
final PendingSplitsCheckpoint checkpoint =
- PendingSplitsCheckpoint.fromStatic(Arrays.asList(testSplit3(),
testSplit1()));
+ new PendingSplitsCheckpoint(Arrays.asList(testSplit3(),
testSplit1()), 5);
serializeAndDeserialize(checkpoint);
serializeAndDeserialize(checkpoint);
@@ -112,6 +112,6 @@ public class PendingSplitsCheckpointSerializerTest {
private static void assertCheckpointsEqual(
final PendingSplitsCheckpoint expected, final
PendingSplitsCheckpoint actual) {
assertThat(actual.splits()).isEqualTo(expected.splits());
-
assertThat(actual.nextSnapshotId()).isEqualTo(expected.nextSnapshotId());
+
assertThat(actual.currentSnapshotId()).isEqualTo(expected.currentSnapshotId());
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
index 96c6bd6..7fe861d 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -102,7 +102,6 @@ public class StaticFileStoreSplitEnumeratorTest {
private static StaticFileStoreSplitEnumerator createEnumerator(
final SplitEnumeratorContext<FileStoreSourceSplit> context,
final FileStoreSourceSplit... splits) {
-
- return new StaticFileStoreSplitEnumerator(context,
Arrays.asList(splits));
+ return new StaticFileStoreSplitEnumerator(context, null,
Arrays.asList(splits));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index ab8e6ce..06dad34 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
@@ -37,6 +38,8 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
/** Scan operation which produces a plan. */
public interface FileStoreScan {
+ Snapshot snapshot(long snapshotId);
+
FileStoreScan withPartitionFilter(Predicate predicate);
FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index d99a792..b5d1370 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -75,6 +75,11 @@ public class FileStoreScanImpl implements FileStoreScan {
}
@Override
+ public Snapshot snapshot(long snapshotId) {
+ return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+ }
+
+ @Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
this.partitionFilter = predicate;
return this;
@@ -154,8 +159,7 @@ public class FileStoreScanImpl implements FileStoreScan {
if (snapshotId == null) {
manifests = Collections.emptyList();
} else {
- Snapshot snapshot =
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
- manifests = snapshot.readAllManifests(manifestList);
+ manifests =
snapshot(snapshotId).readAllManifests(manifestList);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 55c1bf8..0413535 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -49,12 +49,20 @@ public class SinkRecordConverter {
public SinkRecord convert(RowData row) {
BinaryRowData partition = partProjection.apply(row);
- BinaryRowData key = keyProjection.apply(row);
- int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
- int bucket = Math.abs(hash % numBucket);
+ BinaryRowData key = key(row);
+ int bucket = bucket(row, key);
return new SinkRecord(partition, bucket, key, row);
}
+ public BinaryRowData key(RowData row) {
+ return keyProjection.apply(row);
+ }
+
+ public int bucket(RowData row, BinaryRowData key) {
+ int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
+ return Math.abs(hash % numBucket);
+ }
+
private int hashRow(RowData row) {
if (row instanceof BinaryRowData) {
RowKind rowKind = row.getRowKind();