This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a472c23  [FLINK-26276] Introduce ITCases with bug fixes
a472c23 is described below

commit a472c23e9e6aa0900360f3bc12b7e3e325bec318
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 1 11:48:09 2022 +0800

    [FLINK-26276] Introduce ITCases with bug fixes
    
    This closes #23
---
 flink-table-store-connector/pom.xml                |  20 ++
 .../table/store/connector/sink/StoreSink.java      |   2 +-
 .../store/connector/sink/StoreSinkWriter.java      |  22 +-
 .../sink/global/GlobalCommitterOperator.java       |  32 ++-
 .../sink/global/GlobalCommittingSink.java          |  22 +-
 .../global/GlobalCommittingSinkTranslator.java     |  24 +-
 .../table/store/connector/FileStoreITCase.java     | 291 +++++++++++++++++++++
 .../table/store/connector/SerializableRowData.java | 154 +++++++++++
 .../table/store/connector/sink/StoreSinkTest.java  |   2 +-
 .../sink/global/GlobalCommitterOperatorTest.java   |   4 +-
 .../src/test/resources/log4j2-test.properties      |  28 ++
 .../flink/table/store/file/FileStoreImpl.java      | 145 ++++++++++
 .../flink/table/store/file/FileStoreOptions.java   | 106 ++++++--
 .../store/file/manifest/ManifestFileMeta.java      |   8 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  38 +--
 .../store/file/operation/FileStoreScanImpl.java    |  72 +++--
 .../store/file/utils/FileStorePathFactory.java     |  10 +
 .../flink/table/store/file/utils/TypeUtils.java    |   7 +-
 .../store/file/operation/OperationTestUtils.java   |  14 +-
 .../src/test/resources/log4j2-test.properties      |  28 ++
 .../src/test/resources/log4j2-test.xml             |  29 --
 21 files changed, 912 insertions(+), 146 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 70ccea3..f05cd72 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -121,6 +121,26 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit4.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <version>${junit5.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index c61ce0c..1b980c6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -104,7 +104,7 @@ public class StoreSink<WriterStateT, LogCommT>
     }
 
     @Override
-    public StoreGlobalCommitter<LogCommT> createCommitter() throws IOException 
{
+    public StoreGlobalCommitter<LogCommT> createGlobalCommitter() {
         FileStoreCommit commit = fileStore.newCommit();
         CatalogLock lock;
         if (lockFactory == null) {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index dc8c993..b85999c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -128,10 +129,17 @@ public class StoreSinkWriter<WriterStateT>
     @Override
     public List<Committable> prepareCommit() throws IOException {
         List<Committable> committables = new ArrayList<>();
-        for (BinaryRowData partition : writers.keySet()) {
-            Map<Integer, RecordWriter> buckets = writers.get(partition);
-            for (Integer bucket : buckets.keySet()) {
-                RecordWriter writer = buckets.get(bucket);
+        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> 
partIter =
+                writers.entrySet().iterator();
+        while (partIter.hasNext()) {
+            Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = 
partIter.next();
+            BinaryRowData partition = partEntry.getKey();
+            Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+                    partEntry.getValue().entrySet().iterator();
+            while (bucketIter.hasNext()) {
+                Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+                int bucket = entry.getKey();
+                RecordWriter writer = entry.getValue();
                 FileCommittable committable;
                 try {
                     committable = new FileCommittable(partition, bucket, 
writer.prepareCommit());
@@ -149,12 +157,12 @@ public class StoreSinkWriter<WriterStateT>
                 // such as yesterday's partition that no longer needs to be 
written.
                 if (committable.increment().newFiles().isEmpty()) {
                     closeWriter(writer);
-                    buckets.remove(bucket);
+                    bucketIter.remove();
                 }
             }
 
-            if (buckets.isEmpty()) {
-                writers.remove(partition);
+            if (partEntry.getValue().isEmpty()) {
+                partIter.remove();
             }
         }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
index ec48d27..36b1680 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -28,9 +28,11 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +60,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
      * Aggregate committables to global committables and commit the global 
committables to the
      * external system.
      */
-    private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+    private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory;
 
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -69,31 +71,36 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
     private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
 
     /** The committable's serializer. */
-    private final SimpleVersionedSerializer<GlobalCommT> committableSerializer;
+    private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
+            committableSerializer;
 
     /** The operator's state. */
     private ListState<GlobalCommT> streamingCommitterState;
 
+    private GlobalCommitter<CommT, GlobalCommT> committer;
+
     public GlobalCommitterOperator(
-            GlobalCommitter<CommT, GlobalCommT> globalCommitter,
-            SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
-        this.globalCommitter = checkNotNull(globalCommitter);
+            SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory,
+            SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>> 
committableSerializer) {
+        this.committerFactory = checkNotNull(committerFactory);
         this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
+        setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
+        committer = committerFactory.get();
         streamingCommitterState =
                 new SimpleVersionedListState<>(
                         context.getOperatorStateStore()
                                 
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
-                        committableSerializer);
+                        committableSerializer.get());
         List<GlobalCommT> restored = new ArrayList<>();
         streamingCommitterState.get().forEach(restored::add);
         streamingCommitterState.clear();
-        
globalCommitter.commit(globalCommitter.filterRecoveredCommittables(restored));
+        committer.commit(committer.filterRecoveredCommittables(restored));
     }
 
     @Override
@@ -103,7 +110,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
         if (committables.size() > 0) {
             committablesPerCheckpoint.put(
                     context.getCheckpointId(),
-                    globalCommitter.combine(context.getCheckpointId(), 
committables));
+                    committer.combine(context.getCheckpointId(), 
committables));
         }
         streamingCommitterState.update(new 
ArrayList<>(committablesPerCheckpoint.values()));
     }
@@ -112,9 +119,8 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
     public void endInput() throws Exception {
         List<CommT> allCommittables = pollCommittables();
         if (!allCommittables.isEmpty()) {
-            globalCommitter.commit(
-                    Collections.singletonList(
-                            globalCommitter.combine(Long.MAX_VALUE, 
allCommittables)));
+            committer.commit(
+                    
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
         }
     }
 
@@ -124,7 +130,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
         LOG.info("Committing the state for checkpoint {}", checkpointId);
         NavigableMap<Long, GlobalCommT> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        globalCommitter.commit(new ArrayList<>(headMap.values()));
+        committer.commit(new ArrayList<>(headMap.values()));
         headMap.clear();
     }
 
@@ -138,7 +144,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
 
     @Override
     public void close() throws Exception {
-        globalCommitter.close();
+        committer.close();
         committablesPerCheckpoint.clear();
         committables.clear();
         super.close();
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
index 66b4cd6..4b2b512 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.table.store.connector.sink.global;
 
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-import java.io.IOException;
-
 /**
  * A {@link Sink} for exactly-once semantics using a two-phase commit 
protocol. The {@link Sink}
  * consists of a {@link SinkWriter} that performs the precommits and a {@link 
GlobalCommitter} that
@@ -34,22 +33,19 @@ import java.io.IOException;
  * @param <CommT> The type of the committables.
  * @param <GlobalCommT> The type of the aggregated committable.
  */
-public interface GlobalCommittingSink<InputT, CommT, GlobalCommT> extends 
Sink<InputT> {
+public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
+        extends TwoPhaseCommittingSink<InputT, CommT> {
 
-    /**
-     * Creates a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
-     * input.
-     */
-    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
+    @Override
+    default Committer<CommT> createCommitter() {
+        throw new UnsupportedOperationException("Please create global 
committer.");
+    }
 
     /**
      * Creates a {@link GlobalCommitter} that permanently makes the previously 
written data visible
      * through {@link GlobalCommitter#commit}.
      */
-    GlobalCommitter<CommT, GlobalCommT> createCommitter() throws IOException;
-
-    /** Returns the serializer of the committable type. */
-    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+    GlobalCommitter<CommT, GlobalCommT> createGlobalCommitter();
 
     /** Returns the serializer of the global committable type. */
     SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
index 4d83149..e547642 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -28,8 +28,6 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 
-import java.io.IOException;
-
 /** A translator for the {@link GlobalCommittingSink}. */
 public class GlobalCommittingSinkTranslator {
 
@@ -38,12 +36,24 @@ public class GlobalCommittingSinkTranslator {
     private static final String WRITER_NAME = "Writer";
 
     public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
-            DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> 
sink)
-            throws IOException {
+            DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> 
sink) {
         TypeInformation<CommittableMessage<CommT>> commitType =
                 CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
+
+        boolean checkpointingEnabled =
+                
input.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled();
+
+        // We cannot determine the mode, when the execution mode is auto.
+        // We set isBatch to false and only use checkpointingEnabled to 
determine if we want to do
+        // the final commit.
+        // When isBatch is true, only the checkpointID is different, which has 
no effect on the
+        // commit operator.
+
         SingleOutputStreamOperator<CommittableMessage<CommT>> written =
-                input.transform(WRITER_NAME, commitType, new 
SinkWriterOperatorFactory<>(sink));
+                input.transform(
+                        WRITER_NAME,
+                        commitType,
+                        new SinkWriterOperatorFactory<>(sink, false, 
checkpointingEnabled));
 
         SingleOutputStreamOperator<Void> committed =
                 written.global()
@@ -51,8 +61,8 @@ public class GlobalCommittingSinkTranslator {
                                 GLOBAL_COMMITTER_NAME,
                                 Types.VOID,
                                 new GlobalCommitterOperator<>(
-                                        sink.createCommitter(),
-                                        sink.getGlobalCommittableSerializer()))
+                                        sink::createGlobalCommitter,
+                                        sink::getGlobalCommittableSerializer))
                         .setParallelism(1)
                         .setMaxParallelism(1);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
new file mode 100644
index 0000000..8e7aa42
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.connector.sink.StoreSink;
+import 
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
+import org.apache.flink.table.store.connector.source.FileStoreSource;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_FORMAT;
+import static org.apache.flink.table.store.file.FileStoreOptions.FILE_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link FileStoreSource} and {@link StoreSink}. */
+@RunWith(Parameterized.class)
+public class FileStoreITCase extends AbstractTestBase {
+
+    private static final RowType PARTITION_TYPE =
+            new RowType(Collections.singletonList(new RowType.RowField("p", 
new VarCharType())));
+
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new RowType.RowField("k", 
new IntType())));
+
+    private static final RowType VALUE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("v", new IntType()),
+                            new RowType.RowField("p", new VarCharType()),
+                            // rename key
+                            new RowType.RowField("_k", new IntType())));
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static final DataStructureConverter<RowData, Row> CONVERTER =
+            (DataStructureConverter)
+                    DataStructureConverters.getConverter(
+                            TypeConversions.fromLogicalToDataType(VALUE_TYPE));
+
+    private static final int NUM_BUCKET = 3;
+
+    private static final List<RowData> SOURCE_DATA =
+            Arrays.asList(
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(0, StringData.fromString("p1"), 2)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p1"), 1)),
+                    wrap(GenericRowData.of(6, StringData.fromString("p2"), 1)),
+                    wrap(GenericRowData.of(3, StringData.fromString("p2"), 5)),
+                    wrap(GenericRowData.of(5, StringData.fromString("p2"), 
1)));
+
+    private final boolean isBatch;
+
+    public FileStoreITCase(boolean isBatch) {
+        this.isBatch = isBatch;
+    }
+
+    @Parameterized.Parameters(name = "isBatch-{0}")
+    public static List<Boolean> getVarSeg() {
+        return Arrays.asList(true, false);
+    }
+
+    private static SerializableRowData wrap(RowData row) {
+        return new SerializableRowData(row, 
InternalSerializers.create(VALUE_TYPE));
+    }
+
+    @Test
+    public void testPartitioned() throws Exception {
+        innerTest(true);
+    }
+
+    @Test
+    public void testNonPartitioned() throws Exception {
+        innerTest(false);
+    }
+
+    @Test
+    public void testOverwrite() throws Exception {
+        Assume.assumeTrue(isBatch);
+
+        StreamExecutionEnvironment env = buildBatchEnv();
+        FileStore fileStore =
+                buildFileStore(buildConfiguration(isBatch, 
TEMPORARY_FOLDER.newFolder()), true);
+
+        // sink
+        DataStreamSource<RowData> finiteSource = buildTestSource(env, true);
+        write(finiteSource, fileStore, true);
+
+        // overwrite p2
+        finiteSource =
+                env.fromCollection(
+                        Collections.singletonList(
+                                wrap(GenericRowData.of(9, 
StringData.fromString("p2"), 5))),
+                        InternalTypeInfo.of(VALUE_TYPE));
+        Map<String, String> overwrite = new HashMap<>();
+        overwrite.put("p", "p2");
+        write(finiteSource, fileStore, true, overwrite);
+
+        // read
+        List<Row> results = read(env, fileStore, true);
+
+        Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
+        assertThat(results).containsExactlyInAnyOrder(expected);
+
+        // overwrite all
+        finiteSource =
+                env.fromCollection(
+                        Collections.singletonList(
+                                wrap(GenericRowData.of(19, 
StringData.fromString("p2"), 6))),
+                        InternalTypeInfo.of(VALUE_TYPE));
+        write(finiteSource, fileStore, true, new HashMap<>());
+
+        // read
+        results = read(env, fileStore, true);
+        expected = new Row[] {Row.of(19, "p2", 6)};
+        assertThat(results).containsExactlyInAnyOrder(expected);
+    }
+
+    private void innerTest(boolean partitioned) throws Exception {
+        StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : 
buildStreamEnv();
+        FileStore fileStore =
+                buildFileStore(
+                        buildConfiguration(isBatch, 
TEMPORARY_FOLDER.newFolder()), partitioned);
+
+        // sink
+        DataStreamSource<RowData> finiteSource = buildTestSource(env, isBatch);
+        write(finiteSource, fileStore, partitioned);
+
+        // source
+        List<Row> results = read(env, fileStore, partitioned);
+
+        Row[] expected =
+                partitioned
+                        ? new Row[] {
+                            Row.of(5, "p2", 1),
+                            Row.of(3, "p2", 5),
+                            Row.of(5, "p1", 1),
+                            Row.of(0, "p1", 2)
+                        }
+                        : new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)};
+        assertThat(results).containsExactlyInAnyOrder(expected);
+    }
+
+    public static StreamExecutionEnvironment buildStreamEnv() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.enableCheckpointing(100);
+        env.setParallelism(2);
+        return env;
+    }
+
+    public static StreamExecutionEnvironment buildBatchEnv() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+        return env;
+    }
+
+    public static Configuration buildConfiguration(boolean isBatch, File 
folder) {
+        Configuration options = new Configuration();
+        options.set(BUCKET, NUM_BUCKET);
+        if (isBatch) {
+            options.set(FILE_PATH, folder.toURI().toString());
+        } else {
+            options.set(FILE_PATH, "fail://" + folder.getPath());
+            // FailingAtomicRenameFileSystem.setFailPossibility(20);
+        }
+        options.set(FILE_FORMAT, "avro");
+        return options;
+    }
+
+    public static FileStore buildFileStore(Configuration options, boolean 
partitioned) {
+        return new FileStoreImpl(
+                options,
+                "user",
+                partitioned ? PARTITION_TYPE : RowType.of(),
+                KEY_TYPE,
+                VALUE_TYPE,
+                new DeduplicateAccumulator());
+    }
+
+    public static DataStreamSource<RowData> buildTestSource(
+            StreamExecutionEnvironment env, boolean isBatch) {
+        return isBatch
+                ? env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(VALUE_TYPE))
+                : env.addSource(
+                        new FiniteTestSource<>(null, SOURCE_DATA), 
InternalTypeInfo.of(VALUE_TYPE));
+    }
+
+    public static void write(DataStream<RowData> input, FileStore fileStore, 
boolean partitioned)
+            throws Exception {
+        write(input, fileStore, partitioned, null);
+    }
+
+    public static void write(
+            DataStream<RowData> input,
+            FileStore fileStore,
+            boolean partitioned,
+            @Nullable Map<String, String> overwritePartition)
+            throws Exception {
+        int[] partitions = partitioned ? new int[] {1} : new int[0];
+        int[] keys = new int[] {2};
+        StoreSink<?, ?> sink =
+                new StoreSink<>(
+                        null,
+                        fileStore,
+                        VALUE_TYPE,
+                        partitions,
+                        keys,
+                        NUM_BUCKET,
+                        null,
+                        overwritePartition);
+        input = input.keyBy(row -> row.getInt(2)); // key by
+        GlobalCommittingSinkTranslator.translate(input, sink);
+        input.getExecutionEnvironment().execute();
+    }
+
+    public static List<Row> read(
+            StreamExecutionEnvironment env, FileStore fileStore, boolean 
partitioned)
+            throws Exception {
+        int[] partitions = partitioned ? new int[] {1} : new int[0];
+        int[] keys = new int[] {2};
+        FileStoreSource source =
+                new FileStoreSource(fileStore, VALUE_TYPE, partitions, keys, 
null, null, null);
+        CloseableIterator<RowData> iterator =
+                env.fromSource(
+                                source,
+                                WatermarkStrategy.noWatermarks(),
+                                "source",
+                                InternalTypeInfo.of(VALUE_TYPE))
+                        .executeAndCollect();
+        List<Row> results = new ArrayList<>();
+        while (iterator.hasNext()) {
+            results.add(CONVERTER.toExternal(iterator.next()));
+        }
+        iterator.close();
+        return results;
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
new file mode 100644
index 0000000..8a315f3
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SerializableRowData.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** A {@link RowData} implements {@link Serializable}. */
+public class SerializableRowData implements RowData, Serializable {
+
+    private final TypeSerializer<RowData> serializer;
+
+    private transient RowData row;
+
+    public SerializableRowData(RowData row, TypeSerializer<RowData> 
serializer) {
+        this.row = row;
+        this.serializer = serializer;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        serializer.serialize(row, new DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        row = serializer.deserialize(new DataInputViewStreamWrapper(in));
+    }
+
+    @Override
+    public int getArity() {
+        return row.getArity();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind rowKind) {
+        row.setRowKind(rowKind);
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return row.isNullAt(i);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+        return row.getBoolean(i);
+    }
+
+    @Override
+    public byte getByte(int i) {
+        return row.getByte(i);
+    }
+
+    @Override
+    public short getShort(int i) {
+        return row.getShort(i);
+    }
+
+    @Override
+    public int getInt(int i) {
+        return row.getInt(i);
+    }
+
+    @Override
+    public long getLong(int i) {
+        return row.getLong(i);
+    }
+
+    @Override
+    public float getFloat(int i) {
+        return row.getFloat(i);
+    }
+
+    @Override
+    public double getDouble(int i) {
+        return row.getDouble(i);
+    }
+
+    @Override
+    public StringData getString(int i) {
+        return row.getString(i);
+    }
+
+    @Override
+    public DecimalData getDecimal(int i, int precision, int scale) {
+        return row.getDecimal(i, precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int i, int precision) {
+        return row.getTimestamp(i, precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int i) {
+        return row.getRawValue(i);
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        return row.getBinary(i);
+    }
+
+    @Override
+    public ArrayData getArray(int i) {
+        return row.getArray(i);
+    }
+
+    @Override
+    public MapData getMap(int i) {
+        return row.getMap(i);
+    }
+
+    @Override
+    public RowData getRow(int i, int rowArity) {
+        return row.getRow(i, rowArity);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index f469ce6..403cb6c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -179,7 +179,7 @@ public class StoreSinkTest {
     }
 
     private void commit(StoreSink<?, ?> sink, List<Committable> 
fileCommittables) throws Exception {
-        StoreGlobalCommitter committer = sink.createCommitter();
+        StoreGlobalCommitter committer = sink.createGlobalCommitter();
         GlobalCommittable<?> committable = committer.combine(0, 
fileCommittables);
 
         fileStore.expired = false;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
index 77176ef..3bbead5 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -147,7 +147,7 @@ public class GlobalCommitterOperatorTest {
         testHarness.open();
         testHarness.snapshot(1L, 1L);
         testHarness.notifyOfCompletedCheckpoint(1L);
-        assertThat(globalCommitter.getCommittedData().isEmpty()).isTrue();
+        assertThat(globalCommitter.getCommittedData()).isEmpty();
         testHarness.close();
     }
 
@@ -175,7 +175,7 @@ public class GlobalCommitterOperatorTest {
             GlobalCommitter<String, String> globalCommitter) throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
                 new GlobalCommitterOperator<>(
-                        globalCommitter, StringCommittableSerializer.INSTANCE),
+                        () -> globalCommitter, () -> 
StringCommittableSerializer.INSTANCE),
                 CommittableMessageTypeInfo.of(
                                 
(SerializableSupplier<SimpleVersionedSerializer<String>>)
                                         () -> 
StringCommittableSerializer.INSTANCE)
diff --git 
a/flink-table-store-connector/src/test/resources/log4j2-test.properties 
b/flink-table-store-connector/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-table-store-connector/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
new file mode 100644
index 0000000..52857fa
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
+import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
+import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** File store implementation. */
+public class FileStoreImpl implements FileStore {
+
+    private final FileStoreOptions options;
+    private final String user;
+    private final RowType partitionType;
+    private final RowType keyType;
+    private final RowType valueType;
+    private final Accumulator accumulator;
+    private final GeneratedRecordComparator genRecordComparator;
+
+    public FileStoreImpl(
+            Configuration options,
+            String user,
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            Accumulator accumulator) {
+        this.options = new FileStoreOptions(options);
+        this.user = user;
+        this.partitionType = partitionType;
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.accumulator = accumulator;
+        this.genRecordComparator =
+                new SortCodeGenerator(
+                                new TableConfig(),
+                                RowType.of(keyType.getChildren().toArray(new 
LogicalType[0])),
+                                SortUtil.getAscendingSortSpec(
+                                        IntStream.range(0, 
keyType.getFieldCount()).toArray()))
+                        .generateRecordComparator("KeyComparator");
+    }
+
+    private FileStorePathFactory pathFactory() {
+        return new FileStorePathFactory(
+                options.path(), partitionType, options.partitionDefaultName());
+    }
+
+    private ManifestFile.Factory manifestFileFactory() {
+        return new ManifestFile.Factory(
+                partitionType, keyType, valueType, options.manifestFormat(), 
pathFactory());
+    }
+
+    private ManifestList.Factory manifestListFactory() {
+        return new ManifestList.Factory(partitionType, 
options.manifestFormat(), pathFactory());
+    }
+
+    private RecordComparator newKeyComparator() {
+        return 
genRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
+    }
+
+    @Override
+    public FileStoreWriteImpl newWrite() {
+        return new FileStoreWriteImpl(
+                keyType,
+                valueType,
+                newKeyComparator(),
+                accumulator,
+                options.fileFormat(),
+                pathFactory(),
+                newScan(),
+                options.mergeTreeOptions());
+    }
+
+    @Override
+    public FileStoreReadImpl newRead() {
+        return new FileStoreReadImpl(
+                keyType,
+                valueType,
+                newKeyComparator(),
+                accumulator,
+                options.fileFormat(),
+                pathFactory());
+    }
+
+    @Override
+    public FileStoreCommitImpl newCommit() {
+        return new FileStoreCommitImpl(
+                user,
+                partitionType,
+                pathFactory(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                newScan(),
+                options.bucket(),
+                options.manifestTargetSize(),
+                options.manifestMergeMinCount());
+    }
+
+    @Override
+    public FileStoreExpireImpl newExpire() {
+        return new FileStoreExpireImpl(
+                options.snapshotNumRetain(),
+                options.snapshotTimeRetain().toMillis(),
+                pathFactory(),
+                manifestListFactory(),
+                newScan());
+    }
+
+    @Override
+    public FileStoreScanImpl newScan() {
+        return new FileStoreScanImpl(
+                partitionType, pathFactory(), manifestFileFactory(), 
manifestListFactory());
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index ea6d0d6..a54526f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -20,11 +20,18 @@ package org.apache.flink.table.store.file;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
 
 /** Options for {@link FileStore}. */
-public class FileStoreOptions {
+public class FileStoreOptions implements Serializable {
 
     public static final ConfigOption<Integer> BUCKET =
             ConfigOptions.key("bucket")
@@ -32,6 +39,24 @@ public class FileStoreOptions {
                     .defaultValue(1)
                     .withDescription("Bucket number for file store.");
 
+    public static final ConfigOption<String> FILE_PATH =
+            ConfigOptions.key("file.path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The file path of the table store in the 
filesystem.");
+
+    public static final ConfigOption<String> FILE_FORMAT =
+            ConfigOptions.key("file.format")
+                    .stringType()
+                    .defaultValue("orc")
+                    .withDescription("Specify the message format of data 
files.");
+
+    public static final ConfigOption<String> MANIFEST_FORMAT =
+            ConfigOptions.key("manifest.format")
+                    .stringType()
+                    .defaultValue("avro")
+                    .withDescription("Specify the message format of manifest 
files.");
+
     public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
             ConfigOptions.key("manifest.target-file-size")
                     .memoryType()
@@ -46,21 +71,72 @@ public class FileStoreOptions {
                             "To avoid frequent manifest merges, this parameter 
specifies the minimum number "
                                     + "of ManifestFileMeta to merge.");
 
-    public final int bucket;
-    public final MemorySize manifestSuggestedSize;
-    public final int manifestMergeMinCount;
+    public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
+            key("partition.default-name")
+                    .stringType()
+                    .defaultValue("__DEFAULT_PARTITION__")
+                    .withDescription(
+                            "The default partition name in case the dynamic 
partition"
+                                    + " column value is null/empty string.");
+
+    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED =
+            ConfigOptions.key("snapshot.num-retained")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription("The maximum number of completed 
snapshots to retain.");
+
+    public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
+            ConfigOptions.key("snapshot.time-retained")
+                    .durationType()
+                    .defaultValue(Duration.ofDays(1))
+                    .withDescription("The maximum time of completed snapshots 
to retain.");
+
+    private final Configuration options;
+
+    public FileStoreOptions(Configuration options) {
+        this.options = options;
+        // TODO validate all keys
+    }
+
+    public int bucket() {
+        return options.get(BUCKET);
+    }
+
+    public Path path() {
+        return new Path(options.get(FILE_PATH));
+    }
+
+    public FileFormat fileFormat() {
+        return FileFormat.fromTableOptions(
+                Thread.currentThread().getContextClassLoader(), options, 
FILE_FORMAT);
+    }
+
+    public FileFormat manifestFormat() {
+        return FileFormat.fromTableOptions(
+                Thread.currentThread().getContextClassLoader(), options, 
MANIFEST_FORMAT);
+    }
+
+    public MemorySize manifestTargetSize() {
+        return options.get(MANIFEST_TARGET_FILE_SIZE);
+    }
+
+    public String partitionDefaultName() {
+        return options.get(PARTITION_DEFAULT_NAME);
+    }
+
+    public MergeTreeOptions mergeTreeOptions() {
+        return new MergeTreeOptions(options);
+    }
+
+    public int snapshotNumRetain() {
+        return options.get(SNAPSHOT_NUM_RETAINED);
+    }
 
-    public FileStoreOptions(
-            int bucket, MemorySize manifestSuggestedSize, int 
manifestMergeMinCount) {
-        this.bucket = bucket;
-        this.manifestSuggestedSize = manifestSuggestedSize;
-        this.manifestMergeMinCount = manifestMergeMinCount;
+    public Duration snapshotTimeRetain() {
+        return options.get(SNAPSHOT_TIME_RETAINED);
     }
 
-    public FileStoreOptions(ReadableConfig config) {
-        this(
-                config.get(BUCKET),
-                config.get(MANIFEST_TARGET_FILE_SIZE),
-                config.get(MANIFEST_MERGE_MIN_COUNT));
+    public int manifestMergeMinCount() {
+        return options.get(MANIFEST_MERGE_MIN_COUNT);
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index b3ad481..be1d629 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -170,10 +170,12 @@ public class ManifestFileMeta {
             }
 
             // both size and count conditions not satisfied, create new file 
from entries
-            ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
             result.addAll(candidate);
-            newMetas.add(newManifestFileMeta);
-            result.add(newManifestFileMeta);
+            if (entries.size() > 0) {
+                ManifestFileMeta newManifestFileMeta = 
manifestFile.write(entries);
+                newMetas.add(newManifestFileMeta);
+                result.add(newManifestFileMeta);
+            }
         } catch (Throwable e) {
             // exception occurs, clean up and rethrow
             for (ManifestFileMeta manifest : newMetas) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 316b6c4..fe20b45 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
@@ -77,7 +77,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
     private final FileStoreScan scan;
-    private final FileStoreOptions fileStoreOptions;
+    private final int numBucket;
+    private final MemorySize manifestTargetSize;
+    private final int manifestMergeMinCount;
 
     @Nullable private Lock lock;
 
@@ -88,7 +90,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             FileStoreScan scan,
-            FileStoreOptions fileStoreOptions) {
+            int numBucket,
+            MemorySize manifestTargetSize,
+            int manifestMergeMinCount) {
         this.commitUser = commitUser;
         this.partitionType = partitionType;
         this.partitionObjectConverter = new 
RowDataToObjectArrayConverter(partitionType);
@@ -96,7 +100,9 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
         this.scan = scan;
-        this.fileStoreOptions = fileStoreOptions;
+        this.numBucket = numBucket;
+        this.manifestTargetSize = manifestTargetSize;
+        this.manifestMergeMinCount = manifestMergeMinCount;
 
         this.lock = null;
     }
@@ -171,14 +177,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
         // sanity check, all changes must be done within the given partition
         Predicate partitionFilter = 
TypeUtils.partitionMapToPredicate(partition, partitionType);
-        for (ManifestEntry entry : appendChanges) {
-            if 
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
-                throw new IllegalArgumentException(
-                        "Trying to overwrite partition "
-                                + partition.toString()
-                                + ", but the changes in "
-                                + 
pathFactory.getPartitionString(entry.partition())
-                                + " does not belong to this partition");
+        if (partitionFilter != null) {
+            for (ManifestEntry entry : appendChanges) {
+                if 
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
+                    throw new IllegalArgumentException(
+                            "Trying to overwrite partition "
+                                    + partition
+                                    + ", but the changes in "
+                                    + 
pathFactory.getPartitionString(entry.partition())
+                                    + " does not belong to this partition");
+                }
             }
         }
         // overwrite new files
@@ -258,7 +266,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                                         kind,
                                                         
entryWithPartition.getKey(),
                                                         
entryWithBucket.getKey(),
-                                                        
fileStoreOptions.bucket,
+                                                        numBucket,
                                                         file))
                                 .collect(Collectors.toList()));
             }
@@ -307,8 +315,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             oldMetas,
                             changes,
                             manifestFile,
-                            fileStoreOptions.manifestSuggestedSize.getBytes(),
-                            fileStoreOptions.manifestMergeMinCount));
+                            manifestTargetSize.getBytes(),
+                            manifestMergeMinCount));
             // prepare snapshot file
             manifestListName = manifestList.write(newMetas);
             newSnapshot =
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index b75dc35..580d5af 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,12 +55,13 @@ public class FileStoreScanImpl implements FileStoreScan {
     private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
 
-    private Long snapshotId;
-    private List<ManifestFileMeta> manifests;
     private Predicate partitionFilter;
     private Predicate keyFilter;
     private Predicate valueFilter;
-    private Integer bucket;
+
+    private Long specifiedSnapshotId = null;
+    private Integer specifiedBucket = null;
+    private List<ManifestFileMeta> specifiedManifests = null;
 
     public FileStoreScanImpl(
             RowType partitionType,
@@ -70,9 +72,6 @@ public class FileStoreScanImpl implements FileStoreScan {
         this.pathFactory = pathFactory;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
-
-        this.snapshotId = null;
-        this.manifests = new ArrayList<>();
     }
 
     @Override
@@ -122,50 +121,54 @@ public class FileStoreScanImpl implements FileStoreScan {
 
     @Override
     public FileStoreScan withBucket(int bucket) {
-        this.bucket = bucket;
+        this.specifiedBucket = bucket;
         return this;
     }
 
     @Override
     public FileStoreScan withSnapshot(long snapshotId) {
-        this.snapshotId = snapshotId;
-        Snapshot snapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
-        this.manifests = manifestList.read(snapshot.manifestList());
+        this.specifiedSnapshotId = snapshotId;
+        if (specifiedManifests != null) {
+            throw new IllegalStateException("Cannot set both snapshot id and 
manifests.");
+        }
         return this;
     }
 
     @Override
     public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
-        this.manifests = manifests;
+        this.specifiedManifests = manifests;
+        if (specifiedSnapshotId != null) {
+            throw new IllegalStateException("Cannot set both snapshot id and 
manifests.");
+        }
         return this;
     }
 
     @Override
     public Plan plan() {
-        List<ManifestEntry> files = scan();
-
-        return new Plan() {
-            @Nullable
-            @Override
-            public Long snapshotId() {
-                return snapshotId;
+        List<ManifestFileMeta> manifests = specifiedManifests;
+        Long snapshotId = specifiedSnapshotId;
+        if (manifests == null) {
+            if (snapshotId == null) {
+                snapshotId = pathFactory.latestSnapshotId();
             }
-
-            @Override
-            public List<ManifestEntry> files() {
-                return files;
+            if (snapshotId == null) {
+                manifests = Collections.emptyList();
+            } else {
+                Snapshot snapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+                manifests = manifestList.read(snapshot.manifestList());
             }
-        };
-    }
+        }
+
+        final Long readSnapshot = snapshotId;
+        final List<ManifestFileMeta> readManifests = manifests;
 
-    private List<ManifestEntry> scan() {
         List<ManifestEntry> entries;
         try {
             entries =
                     FileUtils.COMMON_IO_FORK_JOIN_POOL
                             .submit(
                                     () ->
-                                            manifests
+                                            readManifests
                                                     .parallelStream()
                                                     
.filter(this::filterManifestFileMeta)
                                                     .flatMap(m -> 
readManifestFileMeta(m).stream())
@@ -201,7 +204,20 @@ public class FileStoreScanImpl implements FileStoreScan {
                             "Unknown value kind " + entry.kind().name());
             }
         }
-        return new ArrayList<>(map.values());
+        List<ManifestEntry> files = new ArrayList<>(map.values());
+
+        return new Plan() {
+            @Nullable
+            @Override
+            public Long snapshotId() {
+                return readSnapshot;
+            }
+
+            @Override
+            public List<ManifestEntry> files() {
+                return files;
+            }
+        };
     }
 
     private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
@@ -216,7 +232,7 @@ public class FileStoreScanImpl implements FileStoreScan {
         //  SstFile.RollingFile#finish
         return (partitionFilter == null
                         || 
partitionFilter.test(partitionConverter.convert(entry.partition())))
-                && (bucket == null || entry.bucket() == bucket);
+                && (specifiedBucket == null || entry.bucket() == 
specifiedBucket);
     }
 
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index ab6a384..be17080 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -130,7 +130,17 @@ public class FileStorePathFactory {
         try {
             Path snapshotDir = new Path(root + "/snapshot");
             FileSystem fs = snapshotDir.getFileSystem();
+
+            if (!fs.exists(snapshotDir)) {
+                LOG.debug("The snapshot director '{}' is not exist.", 
snapshotDir);
+                return null;
+            }
+
             FileStatus[] statuses = fs.listStatus(snapshotDir);
+            if (statuses == null) {
+                throw new RuntimeException(
+                        "The return value is null of the listStatus for the 
snapshot directory.");
+            }
 
             long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
             for (FileStatus status : statuses) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
index a27542c..76ebea5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
@@ -27,6 +27,9 @@ import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
@@ -34,6 +37,7 @@ import java.util.Map;
 /** Utils for parsing among different types. */
 public class TypeUtils {
 
+    @Nullable
     public static Predicate partitionMapToPredicate(
             Map<String, String> partition, RowType partitionType) {
         List<String> fieldNames = partitionType.getFieldNames();
@@ -85,7 +89,8 @@ public class TypeUtils {
             case TIME_WITHOUT_TIME_ZONE:
                 return BinaryStringDataUtil.toTime(str);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return BinaryStringDataUtil.toTimestamp(str);
+                TimestampType timestampType = (TimestampType) type;
+                return BinaryStringDataUtil.toTimestamp(str, 
timestampType.getPrecision());
             default:
                 throw new UnsupportedOperationException("Unsupported type " + 
type.toString());
         }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index 07a27d3..bbec00b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
@@ -66,15 +65,6 @@ public class OperationTestUtils {
         return new MergeTreeOptions(conf);
     }
 
-    private static FileStoreOptions getFileStoreOptions() {
-        Configuration conf = new Configuration();
-        conf.set(FileStoreOptions.BUCKET, 1);
-        conf.set(
-                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
-                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) 
+ "kb"));
-        return new FileStoreOptions(conf);
-    }
-
     public static FileStoreScan createScan(
             FileFormat fileFormat, FileStorePathFactory pathFactory) {
         return new FileStoreScanImpl(
@@ -97,7 +87,9 @@ public class OperationTestUtils {
                 testManifestFileFactory,
                 testManifestListFactory,
                 createScan(fileFormat, pathFactory),
-                getFileStoreOptions());
+                1,
+                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) 
+ "kb"),
+                30);
     }
 
     public static FileStoreWrite createWrite(
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.properties 
b/flink-table-store-core/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.xml 
b/flink-table-store-core/src/test/resources/log4j2-test.xml
deleted file mode 100644
index ae98052..0000000
--- a/flink-table-store-core/src/test/resources/log4j2-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<Configuration status="WARN">
-    <Appenders>
-        <Console name="Console" target="SYSTEM_OUT">
-            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} 
- %msg%n" />
-        </Console>
-    </Appenders>
-    <Loggers>
-        <!-- <Logger name="org.apache.flink.table.store.file.operation" 
level="DEBUG" /> -->
-    </Loggers>
-</Configuration>
\ No newline at end of file

Reply via email to