This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a472c23 [FLINK-26276] Introduce ITCases with bug fixes
a472c23 is described below
commit a472c23e9e6aa0900360f3bc12b7e3e325bec318
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 1 11:48:09 2022 +0800
[FLINK-26276] Introduce ITCases with bug fixes
This closes #23
---
flink-table-store-connector/pom.xml | 20 ++
.../table/store/connector/sink/StoreSink.java | 2 +-
.../store/connector/sink/StoreSinkWriter.java | 22 +-
.../sink/global/GlobalCommitterOperator.java | 32 ++-
.../sink/global/GlobalCommittingSink.java | 22 +-
.../global/GlobalCommittingSinkTranslator.java | 24 +-
.../table/store/connector/FileStoreITCase.java | 291 +++++++++++++++++++++
.../table/store/connector/SerializableRowData.java | 154 +++++++++++
.../table/store/connector/sink/StoreSinkTest.java | 2 +-
.../sink/global/GlobalCommitterOperatorTest.java | 4 +-
.../src/test/resources/log4j2-test.properties | 28 ++
.../flink/table/store/file/FileStoreImpl.java | 145 ++++++++++
.../flink/table/store/file/FileStoreOptions.java | 106 ++++++--
.../store/file/manifest/ManifestFileMeta.java | 8 +-
.../store/file/operation/FileStoreCommitImpl.java | 38 +--
.../store/file/operation/FileStoreScanImpl.java | 72 +++--
.../store/file/utils/FileStorePathFactory.java | 10 +
.../flink/table/store/file/utils/TypeUtils.java | 7 +-
.../store/file/operation/OperationTestUtils.java | 14 +-
.../src/test/resources/log4j2-test.properties | 28 ++
.../src/test/resources/log4j2-test.xml | 29 --
21 files changed, 912 insertions(+), 146 deletions(-)
diff --git a/flink-table-store-connector/pom.xml
b/flink-table-store-connector/pom.xml
index 70ccea3..f05cd72 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -121,6 +121,26 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>${junit5.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index c61ce0c..1b980c6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -104,7 +104,7 @@ public class StoreSink<WriterStateT, LogCommT>
}
@Override
- public StoreGlobalCommitter<LogCommT> createCommitter() throws IOException
{
+ public StoreGlobalCommitter<LogCommT> createGlobalCommitter() {
FileStoreCommit commit = fileStore.newCommit();
CatalogLock lock;
if (lockFactory == null) {
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index dc8c993..b85999c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -128,10 +129,17 @@ public class StoreSinkWriter<WriterStateT>
@Override
public List<Committable> prepareCommit() throws IOException {
List<Committable> committables = new ArrayList<>();
- for (BinaryRowData partition : writers.keySet()) {
- Map<Integer, RecordWriter> buckets = writers.get(partition);
- for (Integer bucket : buckets.keySet()) {
- RecordWriter writer = buckets.get(bucket);
+ Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>>
partIter =
+ writers.entrySet().iterator();
+ while (partIter.hasNext()) {
+ Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry =
partIter.next();
+ BinaryRowData partition = partEntry.getKey();
+ Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+ partEntry.getValue().entrySet().iterator();
+ while (bucketIter.hasNext()) {
+ Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+ int bucket = entry.getKey();
+ RecordWriter writer = entry.getValue();
FileCommittable committable;
try {
committable = new FileCommittable(partition, bucket,
writer.prepareCommit());
@@ -149,12 +157,12 @@ public class StoreSinkWriter<WriterStateT>
// such as yesterday's partition that no longer needs to be
written.
if (committable.increment().newFiles().isEmpty()) {
closeWriter(writer);
- buckets.remove(bucket);
+ bucketIter.remove();
}
}
- if (buckets.isEmpty()) {
- writers.remove(partition);
+ if (partEntry.getValue().isEmpty()) {
+ partIter.remove();
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
index ec48d27..36b1680 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -28,9 +28,11 @@ import
org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +60,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
* Aggregate committables to global committables and commit the global
committables to the
* external system.
*/
- private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+ private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory;
/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]>
STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -69,31 +71,36 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
/** The committable's serializer. */
- private final SimpleVersionedSerializer<GlobalCommT> committableSerializer;
+ private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
+ committableSerializer;
/** The operator's state. */
private ListState<GlobalCommT> streamingCommitterState;
+ private GlobalCommitter<CommT, GlobalCommT> committer;
+
public GlobalCommitterOperator(
- GlobalCommitter<CommT, GlobalCommT> globalCommitter,
- SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
- this.globalCommitter = checkNotNull(globalCommitter);
+ SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>>
committerFactory,
+ SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
committableSerializer) {
+ this.committerFactory = checkNotNull(committerFactory);
this.committableSerializer = committableSerializer;
this.committablesPerCheckpoint = new TreeMap<>();
+ setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
+ committer = committerFactory.get();
streamingCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
- committableSerializer);
+ committableSerializer.get());
List<GlobalCommT> restored = new ArrayList<>();
streamingCommitterState.get().forEach(restored::add);
streamingCommitterState.clear();
-
globalCommitter.commit(globalCommitter.filterRecoveredCommittables(restored));
+ committer.commit(committer.filterRecoveredCommittables(restored));
}
@Override
@@ -103,7 +110,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
if (committables.size() > 0) {
committablesPerCheckpoint.put(
context.getCheckpointId(),
- globalCommitter.combine(context.getCheckpointId(),
committables));
+ committer.combine(context.getCheckpointId(),
committables));
}
streamingCommitterState.update(new
ArrayList<>(committablesPerCheckpoint.values()));
}
@@ -112,9 +119,8 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
public void endInput() throws Exception {
List<CommT> allCommittables = pollCommittables();
if (!allCommittables.isEmpty()) {
- globalCommitter.commit(
- Collections.singletonList(
- globalCommitter.combine(Long.MAX_VALUE,
allCommittables)));
+ committer.commit(
+
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
}
}
@@ -124,7 +130,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
LOG.info("Committing the state for checkpoint {}", checkpointId);
NavigableMap<Long, GlobalCommT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- globalCommitter.commit(new ArrayList<>(headMap.values()));
+ committer.commit(new ArrayList<>(headMap.values()));
headMap.clear();
}
@@ -138,7 +144,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamO
@Override
public void close() throws Exception {
- globalCommitter.close();
+ committer.close();
committablesPerCheckpoint.clear();
committables.clear();
super.close();
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
index 66b4cd6..4b2b512 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -18,13 +18,12 @@
package org.apache.flink.table.store.connector.sink.global;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import java.io.IOException;
-
/**
* A {@link Sink} for exactly-once semantics using a two-phase commit
protocol. The {@link Sink}
* consists of a {@link SinkWriter} that performs the precommits and a {@link
GlobalCommitter} that
@@ -34,22 +33,19 @@ import java.io.IOException;
* @param <CommT> The type of the committables.
* @param <GlobalCommT> The type of the aggregated committable.
*/
-public interface GlobalCommittingSink<InputT, CommT, GlobalCommT> extends
Sink<InputT> {
+public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
+ extends TwoPhaseCommittingSink<InputT, CommT> {
- /**
- * Creates a {@link PrecommittingSinkWriter} that creates committables on
checkpoint or end of
- * input.
- */
- PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context)
throws IOException;
+ @Override
+ default Committer<CommT> createCommitter() {
+ throw new UnsupportedOperationException("Please create global
committer.");
+ }
/**
* Creates a {@link GlobalCommitter} that permanently makes the previously
written data visible
* through {@link GlobalCommitter#commit}.
*/
- GlobalCommitter<CommT, GlobalCommT> createCommitter() throws IOException;
-
- /** Returns the serializer of the committable type. */
- SimpleVersionedSerializer<CommT> getCommittableSerializer();
+ GlobalCommitter<CommT, GlobalCommT> createGlobalCommitter();
/** Returns the serializer of the global committable type. */
SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
index 4d83149..e547642 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -28,8 +28,6 @@ import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
-import java.io.IOException;
-
/** A translator for the {@link GlobalCommittingSink}. */
public class GlobalCommittingSinkTranslator {
@@ -38,12 +36,24 @@ public class GlobalCommittingSinkTranslator {
private static final String WRITER_NAME = "Writer";
public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
- DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT>
sink)
- throws IOException {
+ DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT>
sink) {
TypeInformation<CommittableMessage<CommT>> commitType =
CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
+
+ boolean checkpointingEnabled =
+
input.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled();
+
+ // We cannot determine the mode, when the execution mode is auto.
+ // We set isBatch to false and only use checkpointingEnabled to
determine if we want to do
+ // the final commit.
+ // When isBatch is true, only the checkpointID is different, which has
no effect on the
+ // commit operator.
+
SingleOutputStreamOperator<CommittableMessage<CommT>> written =
- input.transform(WRITER_NAME, commitType, new
SinkWriterOperatorFactory<>(sink));
+ input.transform(
+ WRITER_NAME,
+ commitType,
+ new SinkWriterOperatorFactory<>(sink, false,
checkpointingEnabled));
SingleOutputStreamOperator<Void> committed =
written.global()
@@ -51,8 +61,8 @@ public class GlobalCommittingSinkTranslator {
GLOBAL_COMMITTER_NAME,
Types.VOID,
new GlobalCommitterOperator<>(
- sink.createCommitter(),
- sink.getGlobalCommittableSerializer()))
+ sink::createGlobalCommitter,
+ sink::getGlobalCommittableSerializer))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
new file mode 100644
index 0000000..8e7aa42
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.connector.sink.StoreSink;
+import
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
+@RunWith(Parameterized.class)
+public class FileStoreITCase extends AbstractTestBase {
+
+ private static final RowType PARTITION_TYPE =
+ new RowType(Collections.singletonList(new RowType.RowField("p",
new VarCharType())));
+
+ private static final RowType KEY_TYPE =
+ new RowType(Collections.singletonList(new RowType.RowField("k",
new IntType())));
+
+ private static final RowType VALUE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("v", new IntType()),
+ new RowType.RowField("p", new VarCharType()),
+ // rename key
+ new RowType.RowField("_k", new IntType())));
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static final DataStructureConverter<RowData, Row> CONVERTER =
+ (DataStructureConverter)
+ DataStructureConverters.getConverter(
+ TypeConversions.fromLogicalToDataType(VALUE_TYPE));
+
+ private static final int NUM_BUCKET = 3;
+
+ private static final List<RowData> SOURCE_DATA =
+ Arrays.asList(
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+ wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+ wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+ wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+ wrap(GenericRowData.of(5, StringData.fromString("p2"),
1)));
+
+ private final boolean isBatch;
+
+ public FileStoreITCase(boolean isBatch) {
+ this.isBatch = isBatch;
+ }
+
+ @Parameterized.Parameters(name = "isBatch-{0}")
+ public static List<Boolean> getVarSeg() {
+ return Arrays.asList(true, false);
+ }
+
+ private static SerializableRowData wrap(RowData row) {
+ return new SerializableRowData(row,
InternalSerializers.create(VALUE_TYPE));
+ }
+
+ @Test
+ public void testPartitioned() throws Exception {
+ innerTest(true);
+ }
+
+ @Test
+ public void testNonPartitioned() throws Exception {
+ innerTest(false);
+ }
+
+ @Test
+ public void testOverwrite() throws Exception {
+ Assume.assumeTrue(isBatch);
+
+ StreamExecutionEnvironment env = buildBatchEnv();
+ FileStore fileStore =
+ buildFileStore(buildConfiguration(isBatch,
TEMPORARY_FOLDER.newFolder()), true);
+
+ // sink
+ DataStreamSource<RowData> finiteSource = buildTestSource(env, true);
+ write(finiteSource, fileStore, true);
+
+ // overwrite p2
+ finiteSource =
+ env.fromCollection(
+ Collections.singletonList(
+ wrap(GenericRowData.of(9,
StringData.fromString("p2"), 5))),
+ InternalTypeInfo.of(VALUE_TYPE));
+ Map<String, String> overwrite = new HashMap<>();
+ overwrite.put("p", "p2");
+ write(finiteSource, fileStore, true, overwrite);
+
+ // read
+ List<Row> results = read(env, fileStore, true);
+
+ Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
+ assertThat(results).containsExactlyInAnyOrder(expected);
+
+ // overwrite all
+ finiteSource =
+ env.fromCollection(
+ Collections.singletonList(
+ wrap(GenericRowData.of(19,
StringData.fromString("p2"), 6))),
+ InternalTypeInfo.of(VALUE_TYPE));
+ write(finiteSource, fileStore, true, new HashMap<>());
+
+ // read
+ results = read(env, fileStore, true);
+ expected = new Row[] {Row.of(19, "p2", 6)};
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ }
+
+ private void innerTest(boolean partitioned) throws Exception {
+ StreamExecutionEnvironment env = isBatch ? buildBatchEnv() :
buildStreamEnv();
+ FileStore fileStore =
+ buildFileStore(
+ buildConfiguration(isBatch,
TEMPORARY_FOLDER.newFolder()), partitioned);
+
+ // sink
+ DataStreamSource<RowData> finiteSource = buildTestSource(env, isBatch);
+ write(finiteSource, fileStore, partitioned);
+
+ // source
+ List<Row> results = read(env, fileStore, partitioned);
+
+ Row[] expected =
+ partitioned
+ ? new Row[] {
+ Row.of(5, "p2", 1),
+ Row.of(3, "p2", 5),
+ Row.of(5, "p1", 1),
+ Row.of(0, "p1", 2)
+ }
+ : new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)};
+ assertThat(results).containsExactlyInAnyOrder(expected);
+ }
+
+ public static StreamExecutionEnvironment buildStreamEnv() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+ env.setParallelism(2);
+ return env;
+ }
+
+ public static StreamExecutionEnvironment buildBatchEnv() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(2);
+ return env;
+ }
+
+ public static Configuration buildConfiguration(boolean isBatch, File
folder) {
+ Configuration options = new Configuration();
+ options.set(BUCKET, NUM_BUCKET);
+ if (isBatch) {
+ options.set(FILE_PATH, folder.toURI().toString());
+ } else {
+ options.set(FILE_PATH, "fail://" + folder.getPath());
+ // FailingAtomicRenameFileSystem.setFailPossibility(20);
+ }
+ options.set(FILE_FORMAT, "avro");
+ return options;
+ }
+
+ public static FileStore buildFileStore(Configuration options, boolean
partitioned) {
+ return new FileStoreImpl(
+ options,
+ "user",
+ partitioned ? PARTITION_TYPE : RowType.of(),
+ KEY_TYPE,
+ VALUE_TYPE,
+ new DeduplicateAccumulator());
+ }
+
+ public static DataStreamSource<RowData> buildTestSource(
+ StreamExecutionEnvironment env, boolean isBatch) {
+ return isBatch
+ ? env.fromCollection(SOURCE_DATA,
InternalTypeInfo.of(VALUE_TYPE))
+ : env.addSource(
+ new FiniteTestSource<>(null, SOURCE_DATA),
InternalTypeInfo.of(VALUE_TYPE));
+ }
+
+ public static void write(DataStream<RowData> input, FileStore fileStore,
boolean partitioned)
+ throws Exception {
+ write(input, fileStore, partitioned, null);
+ }
+
+ public static void write(
+ DataStream<RowData> input,
+ FileStore fileStore,
+ boolean partitioned,
+ @Nullable Map<String, String> overwritePartition)
+ throws Exception {
+ int[] partitions = partitioned ? new int[] {1} : new int[0];
+ int[] keys = new int[] {2};
+ StoreSink<?, ?> sink =
+ new StoreSink<>(
+ null,
+ fileStore,
+ VALUE_TYPE,
+ partitions,
+ keys,
+ NUM_BUCKET,
+ null,
+ overwritePartition);
+ input = input.keyBy(row -> row.getInt(2)); // key by
+ GlobalCommittingSinkTranslator.translate(input, sink);
+ input.getExecutionEnvironment().execute();
+ }
+
+ public static List<Row> read(
+ StreamExecutionEnvironment env, FileStore fileStore, boolean
partitioned)
+ throws Exception {
+ int[] partitions = partitioned ? new int[] {1} : new int[0];
+ int[] keys = new int[] {2};
+ FileStoreSource source =
+ new FileStoreSource(fileStore, VALUE_TYPE, partitions, keys,
null, null, null);
+ CloseableIterator<RowData> iterator =
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "source",
+ InternalTypeInfo.of(VALUE_TYPE))
+ .executeAndCollect();
+ List<Row> results = new ArrayList<>();
+ while (iterator.hasNext()) {
+ results.add(CONVERTER.toExternal(iterator.next()));
+ }
+ iterator.close();
+ return results;
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
new file mode 100644
index 0000000..8a315f3
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** A {@link RowData} implements {@link Serializable}. */
+public class SerializableRowData implements RowData, Serializable {
+
+ private final TypeSerializer<RowData> serializer;
+
+ private transient RowData row;
+
+ public SerializableRowData(RowData row, TypeSerializer<RowData>
serializer) {
+ this.row = row;
+ this.serializer = serializer;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ serializer.serialize(row, new DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ row = serializer.deserialize(new DataInputViewStreamWrapper(in));
+ }
+
+ @Override
+ public int getArity() {
+ return row.getArity();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return row.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind rowKind) {
+ row.setRowKind(rowKind);
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return row.isNullAt(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return row.getBoolean(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return row.getByte(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return row.getShort(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ return row.getInt(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return row.getLong(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return row.getFloat(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return row.getDouble(i);
+ }
+
+ @Override
+ public StringData getString(int i) {
+ return row.getString(i);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return row.getDecimal(i, precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ return row.getTimestamp(i, precision);
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int i) {
+ return row.getRawValue(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return row.getBinary(i);
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ return row.getArray(i);
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ return row.getMap(i);
+ }
+
+ @Override
+ public RowData getRow(int i, int rowArity) {
+ return row.getRow(i, rowArity);
+ }
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index f469ce6..403cb6c 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -179,7 +179,7 @@ public class StoreSinkTest {
}
private void commit(StoreSink<?, ?> sink, List<Committable>
fileCommittables) throws Exception {
- StoreGlobalCommitter committer = sink.createCommitter();
+ StoreGlobalCommitter committer = sink.createGlobalCommitter();
GlobalCommittable<?> committable = committer.combine(0,
fileCommittables);
fileStore.expired = false;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
index 77176ef..3bbead5 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -147,7 +147,7 @@ public class GlobalCommitterOperatorTest {
testHarness.open();
testHarness.snapshot(1L, 1L);
testHarness.notifyOfCompletedCheckpoint(1L);
- assertThat(globalCommitter.getCommittedData().isEmpty()).isTrue();
+ assertThat(globalCommitter.getCommittedData()).isEmpty();
testHarness.close();
}
@@ -175,7 +175,7 @@ public class GlobalCommitterOperatorTest {
GlobalCommitter<String, String> globalCommitter) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new GlobalCommitterOperator<>(
- globalCommitter, StringCommittableSerializer.INSTANCE),
+ () -> globalCommitter, () ->
StringCommittableSerializer.INSTANCE),
CommittableMessageTypeInfo.of(
(SerializableSupplier<SimpleVersionedSerializer<String>>)
() ->
StringCommittableSerializer.INSTANCE)
diff --git
a/flink-table-store-connector/src/test/resources/log4j2-test.properties
b/flink-table-store-connector/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-table-store-connector/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
new file mode 100644
index 0000000..52857fa
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
+import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
+import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** File store implementation. */
+public class FileStoreImpl implements FileStore {
+
+ private final FileStoreOptions options;
+ private final String user;
+ private final RowType partitionType;
+ private final RowType keyType;
+ private final RowType valueType;
+ private final Accumulator accumulator;
+ private final GeneratedRecordComparator genRecordComparator;
+
+ public FileStoreImpl(
+ Configuration options,
+ String user,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ Accumulator accumulator) {
+ this.options = new FileStoreOptions(options);
+ this.user = user;
+ this.partitionType = partitionType;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.accumulator = accumulator;
+ this.genRecordComparator =
+ new SortCodeGenerator(
+ new TableConfig(),
+ RowType.of(keyType.getChildren().toArray(new
LogicalType[0])),
+ SortUtil.getAscendingSortSpec(
+ IntStream.range(0,
keyType.getFieldCount()).toArray()))
+ .generateRecordComparator("KeyComparator");
+ }
+
+ private FileStorePathFactory pathFactory() {
+ return new FileStorePathFactory(
+ options.path(), partitionType, options.partitionDefaultName());
+ }
+
+ private ManifestFile.Factory manifestFileFactory() {
+ return new ManifestFile.Factory(
+ partitionType, keyType, valueType, options.manifestFormat(),
pathFactory());
+ }
+
+ private ManifestList.Factory manifestListFactory() {
+ return new ManifestList.Factory(partitionType,
options.manifestFormat(), pathFactory());
+ }
+
+ private RecordComparator newKeyComparator() {
+ return
genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
+ }
+
+ @Override
+ public FileStoreWriteImpl newWrite() {
+ return new FileStoreWriteImpl(
+ keyType,
+ valueType,
+ newKeyComparator(),
+ accumulator,
+ options.fileFormat(),
+ pathFactory(),
+ newScan(),
+ options.mergeTreeOptions());
+ }
+
+ @Override
+ public FileStoreReadImpl newRead() {
+ return new FileStoreReadImpl(
+ keyType,
+ valueType,
+ newKeyComparator(),
+ accumulator,
+ options.fileFormat(),
+ pathFactory());
+ }
+
+ @Override
+ public FileStoreCommitImpl newCommit() {
+ return new FileStoreCommitImpl(
+ user,
+ partitionType,
+ pathFactory(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ newScan(),
+ options.bucket(),
+ options.manifestTargetSize(),
+ options.manifestMergeMinCount());
+ }
+
+ @Override
+ public FileStoreExpireImpl newExpire() {
+ return new FileStoreExpireImpl(
+ options.snapshotNumRetain(),
+ options.snapshotTimeRetain().toMillis(),
+ pathFactory(),
+ manifestListFactory(),
+ newScan());
+ }
+
+ @Override
+ public FileStoreScanImpl newScan() {
+ return new FileStoreScanImpl(
+ partitionType, pathFactory(), manifestFileFactory(),
manifestListFactory());
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index ea6d0d6..a54526f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -20,11 +20,18 @@ package org.apache.flink.table.store.file;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
/** Options for {@link FileStore}. */
-public class FileStoreOptions {
+public class FileStoreOptions implements Serializable {
public static final ConfigOption<Integer> BUCKET =
ConfigOptions.key("bucket")
@@ -32,6 +39,24 @@ public class FileStoreOptions {
.defaultValue(1)
.withDescription("Bucket number for file store.");
+ public static final ConfigOption<String> FILE_PATH =
+ ConfigOptions.key("file.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The file path of the table store in the
filesystem.");
+
+ public static final ConfigOption<String> FILE_FORMAT =
+ ConfigOptions.key("file.format")
+ .stringType()
+ .defaultValue("orc")
+ .withDescription("Specify the message format of data
files.");
+
+ public static final ConfigOption<String> MANIFEST_FORMAT =
+ ConfigOptions.key("manifest.format")
+ .stringType()
+ .defaultValue("avro")
+ .withDescription("Specify the message format of manifest
files.");
+
public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
ConfigOptions.key("manifest.target-file-size")
.memoryType()
@@ -46,21 +71,72 @@ public class FileStoreOptions {
"To avoid frequent manifest merges, this parameter
specifies the minimum number "
+ "of ManifestFileMeta to merge.");
- public final int bucket;
- public final MemorySize manifestSuggestedSize;
- public final int manifestMergeMinCount;
+ public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
+ key("partition.default-name")
+ .stringType()
+ .defaultValue("__DEFAULT_PARTITION__")
+ .withDescription(
+ "The default partition name in case the dynamic
partition"
+ + " column value is null/empty string.");
+
+ public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED =
+ ConfigOptions.key("snapshot.num-retained")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription("The maximum number of completed
snapshots to retain.");
+
+ public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
+ ConfigOptions.key("snapshot.time-retained")
+ .durationType()
+ .defaultValue(Duration.ofDays(1))
+ .withDescription("The maximum time of completed snapshots
to retain.");
+
+ private final Configuration options;
+
+ public FileStoreOptions(Configuration options) {
+ this.options = options;
+ // TODO validate all keys
+ }
+
+ public int bucket() {
+ return options.get(BUCKET);
+ }
+
+ public Path path() {
+ return new Path(options.get(FILE_PATH));
+ }
+
+ public FileFormat fileFormat() {
+ return FileFormat.fromTableOptions(
+ Thread.currentThread().getContextClassLoader(), options,
FILE_FORMAT);
+ }
+
+ public FileFormat manifestFormat() {
+ return FileFormat.fromTableOptions(
+ Thread.currentThread().getContextClassLoader(), options,
MANIFEST_FORMAT);
+ }
+
+ public MemorySize manifestTargetSize() {
+ return options.get(MANIFEST_TARGET_FILE_SIZE);
+ }
+
+ public String partitionDefaultName() {
+ return options.get(PARTITION_DEFAULT_NAME);
+ }
+
+ public MergeTreeOptions mergeTreeOptions() {
+ return new MergeTreeOptions(options);
+ }
+
+ public int snapshotNumRetain() {
+ return options.get(SNAPSHOT_NUM_RETAINED);
+ }
- public FileStoreOptions(
- int bucket, MemorySize manifestSuggestedSize, int
manifestMergeMinCount) {
- this.bucket = bucket;
- this.manifestSuggestedSize = manifestSuggestedSize;
- this.manifestMergeMinCount = manifestMergeMinCount;
+ public Duration snapshotTimeRetain() {
+ return options.get(SNAPSHOT_TIME_RETAINED);
}
- public FileStoreOptions(ReadableConfig config) {
- this(
- config.get(BUCKET),
- config.get(MANIFEST_TARGET_FILE_SIZE),
- config.get(MANIFEST_MERGE_MIN_COUNT));
+ public int manifestMergeMinCount() {
+ return options.get(MANIFEST_MERGE_MIN_COUNT);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index b3ad481..be1d629 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -170,10 +170,12 @@ public class ManifestFileMeta {
}
// both size and count conditions not satisfied, create new file
from entries
- ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
result.addAll(candidate);
- newMetas.add(newManifestFileMeta);
- result.add(newManifestFileMeta);
+ if (entries.size() > 0) {
+ ManifestFileMeta newManifestFileMeta =
manifestFile.write(entries);
+ newMetas.add(newManifestFileMeta);
+ result.add(newManifestFileMeta);
+ }
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newMetas) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 316b6c4..fe20b45 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -18,10 +18,10 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -77,7 +77,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final ManifestFile manifestFile;
private final ManifestList manifestList;
private final FileStoreScan scan;
- private final FileStoreOptions fileStoreOptions;
+ private final int numBucket;
+ private final MemorySize manifestTargetSize;
+ private final int manifestMergeMinCount;
@Nullable private Lock lock;
@@ -88,7 +90,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
FileStoreScan scan,
- FileStoreOptions fileStoreOptions) {
+ int numBucket,
+ MemorySize manifestTargetSize,
+ int manifestMergeMinCount) {
this.commitUser = commitUser;
this.partitionType = partitionType;
this.partitionObjectConverter = new
RowDataToObjectArrayConverter(partitionType);
@@ -96,7 +100,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.scan = scan;
- this.fileStoreOptions = fileStoreOptions;
+ this.numBucket = numBucket;
+ this.manifestTargetSize = manifestTargetSize;
+ this.manifestMergeMinCount = manifestMergeMinCount;
this.lock = null;
}
@@ -171,14 +177,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<ManifestEntry> appendChanges =
collectChanges(committable.newFiles(), ValueKind.ADD);
// sanity check, all changes must be done within the given partition
Predicate partitionFilter =
TypeUtils.partitionMapToPredicate(partition, partitionType);
- for (ManifestEntry entry : appendChanges) {
- if
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
- throw new IllegalArgumentException(
- "Trying to overwrite partition "
- + partition.toString()
- + ", but the changes in "
- +
pathFactory.getPartitionString(entry.partition())
- + " does not belong to this partition");
+ if (partitionFilter != null) {
+ for (ManifestEntry entry : appendChanges) {
+ if
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
+ throw new IllegalArgumentException(
+ "Trying to overwrite partition "
+ + partition
+ + ", but the changes in "
+ +
pathFactory.getPartitionString(entry.partition())
+ + " does not belong to this partition");
+ }
}
}
// overwrite new files
@@ -258,7 +266,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
kind,
entryWithPartition.getKey(),
entryWithBucket.getKey(),
-
fileStoreOptions.bucket,
+ numBucket,
file))
.collect(Collectors.toList()));
}
@@ -307,8 +315,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
oldMetas,
changes,
manifestFile,
- fileStoreOptions.manifestSuggestedSize.getBytes(),
- fileStoreOptions.manifestMergeMinCount));
+ manifestTargetSize.getBytes(),
+ manifestMergeMinCount));
// prepare snapshot file
manifestListName = manifestList.write(newMetas);
newSnapshot =
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index b75dc35..580d5af 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,12 +55,13 @@ public class FileStoreScanImpl implements FileStoreScan {
private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
- private Long snapshotId;
- private List<ManifestFileMeta> manifests;
private Predicate partitionFilter;
private Predicate keyFilter;
private Predicate valueFilter;
- private Integer bucket;
+
+ private Long specifiedSnapshotId = null;
+ private Integer specifiedBucket = null;
+ private List<ManifestFileMeta> specifiedManifests = null;
public FileStoreScanImpl(
RowType partitionType,
@@ -70,9 +72,6 @@ public class FileStoreScanImpl implements FileStoreScan {
this.pathFactory = pathFactory;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
-
- this.snapshotId = null;
- this.manifests = new ArrayList<>();
}
@Override
@@ -122,50 +121,54 @@ public class FileStoreScanImpl implements FileStoreScan {
@Override
public FileStoreScan withBucket(int bucket) {
- this.bucket = bucket;
+ this.specifiedBucket = bucket;
return this;
}
@Override
public FileStoreScan withSnapshot(long snapshotId) {
- this.snapshotId = snapshotId;
- Snapshot snapshot =
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
- this.manifests = manifestList.read(snapshot.manifestList());
+ this.specifiedSnapshotId = snapshotId;
+ if (specifiedManifests != null) {
+ throw new IllegalStateException("Cannot set both snapshot id and
manifests.");
+ }
return this;
}
@Override
public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
- this.manifests = manifests;
+ this.specifiedManifests = manifests;
+ if (specifiedSnapshotId != null) {
+ throw new IllegalStateException("Cannot set both snapshot id and
manifests.");
+ }
return this;
}
@Override
public Plan plan() {
- List<ManifestEntry> files = scan();
-
- return new Plan() {
- @Nullable
- @Override
- public Long snapshotId() {
- return snapshotId;
+ List<ManifestFileMeta> manifests = specifiedManifests;
+ Long snapshotId = specifiedSnapshotId;
+ if (manifests == null) {
+ if (snapshotId == null) {
+ snapshotId = pathFactory.latestSnapshotId();
}
-
- @Override
- public List<ManifestEntry> files() {
- return files;
+ if (snapshotId == null) {
+ manifests = Collections.emptyList();
+ } else {
+ Snapshot snapshot =
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+ manifests = manifestList.read(snapshot.manifestList());
}
- };
- }
+ }
+
+ final Long readSnapshot = snapshotId;
+ final List<ManifestFileMeta> readManifests = manifests;
- private List<ManifestEntry> scan() {
List<ManifestEntry> entries;
try {
entries =
FileUtils.COMMON_IO_FORK_JOIN_POOL
.submit(
() ->
- manifests
+ readManifests
.parallelStream()
.filter(this::filterManifestFileMeta)
.flatMap(m ->
readManifestFileMeta(m).stream())
@@ -201,7 +204,20 @@ public class FileStoreScanImpl implements FileStoreScan {
"Unknown value kind " + entry.kind().name());
}
}
- return new ArrayList<>(map.values());
+ List<ManifestEntry> files = new ArrayList<>(map.values());
+
+ return new Plan() {
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ return readSnapshot;
+ }
+
+ @Override
+ public List<ManifestEntry> files() {
+ return files;
+ }
+ };
}
private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
@@ -216,7 +232,7 @@ public class FileStoreScanImpl implements FileStoreScan {
// SstFile.RollingFile#finish
return (partitionFilter == null
||
partitionFilter.test(partitionConverter.convert(entry.partition())))
- && (bucket == null || entry.bucket() == bucket);
+ && (specifiedBucket == null || entry.bucket() ==
specifiedBucket);
}
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index ab6a384..be17080 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -130,7 +130,17 @@ public class FileStorePathFactory {
try {
Path snapshotDir = new Path(root + "/snapshot");
FileSystem fs = snapshotDir.getFileSystem();
+
+ if (!fs.exists(snapshotDir)) {
+ LOG.debug("The snapshot director '{}' is not exist.",
snapshotDir);
+ return null;
+ }
+
FileStatus[] statuses = fs.listStatus(snapshotDir);
+ if (statuses == null) {
+ throw new RuntimeException(
+ "The return value is null of the listStatus for the
snapshot directory.");
+ }
long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
for (FileStatus status : statuses) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
index a27542c..76ebea5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
@@ -27,6 +27,9 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -34,6 +37,7 @@ import java.util.Map;
/** Utils for parsing among different types. */
public class TypeUtils {
+ @Nullable
public static Predicate partitionMapToPredicate(
Map<String, String> partition, RowType partitionType) {
List<String> fieldNames = partitionType.getFieldNames();
@@ -85,7 +89,8 @@ public class TypeUtils {
case TIME_WITHOUT_TIME_ZONE:
return BinaryStringDataUtil.toTime(str);
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return BinaryStringDataUtil.toTimestamp(str);
+ TimestampType timestampType = (TimestampType) type;
+ return BinaryStringDataUtil.toTimestamp(str,
timestampType.getPrecision());
default:
throw new UnsupportedOperationException("Unsupported type " +
type.toString());
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index 07a27d3..bbec00b 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
@@ -66,15 +65,6 @@ public class OperationTestUtils {
return new MergeTreeOptions(conf);
}
- private static FileStoreOptions getFileStoreOptions() {
- Configuration conf = new Configuration();
- conf.set(FileStoreOptions.BUCKET, 1);
- conf.set(
- FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
- MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1)
+ "kb"));
- return new FileStoreOptions(conf);
- }
-
public static FileStoreScan createScan(
FileFormat fileFormat, FileStorePathFactory pathFactory) {
return new FileStoreScanImpl(
@@ -97,7 +87,9 @@ public class OperationTestUtils {
testManifestFileFactory,
testManifestListFactory,
createScan(fileFormat, pathFactory),
- getFileStoreOptions());
+ 1,
+ MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1)
+ "kb"),
+ 30);
}
public static FileStoreWrite createWrite(
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.properties
b/flink-table-store-core/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.xml
b/flink-table-store-core/src/test/resources/log4j2-test.xml
deleted file mode 100644
index ae98052..0000000
--- a/flink-table-store-core/src/test/resources/log4j2-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<Configuration status="WARN">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36}
- %msg%n" />
- </Console>
- </Appenders>
- <Loggers>
- <!-- <Logger name="org.apache.flink.table.store.file.operation"
level="DEBUG" /> -->
- </Loggers>
-</Configuration>
\ No newline at end of file