This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f950a3e  Flink: Add operator to collect data files and append to a 
table (#1185)
f950a3e is described below

commit f950a3e63c98e88b1f8905faacf33a1d5d31e4c0
Author: openinx <[email protected]>
AuthorDate: Sat Aug 29 01:19:33 2020 +0800

    Flink: Add operator to collect data files and append to a table (#1185)
---
 .../org/apache/iceberg/flink/IcebergSinkUtil.java  |  81 ----
 .../iceberg/flink/data/FlinkParquetWriters.java    |   3 +-
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 220 +++++++++
 .../iceberg/flink/sink/IcebergFilesCommitter.java  | 229 ++++++++++
 .../flink/{ => sink}/IcebergStreamWriter.java      |   6 +-
 .../flink/{ => sink}/PartitionedFanoutWriter.java  |   2 +-
 .../flink/{ => sink}/RowDataTaskWriterFactory.java |  42 +-
 .../iceberg/flink/{ => sink}/RowDataWrapper.java   |   2 +-
 .../flink/{ => sink}/TaskWriterFactory.java        |   2 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  69 ++-
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 178 ++++++++
 .../flink/sink/TestIcebergFilesCommitter.java      | 505 +++++++++++++++++++++
 .../flink/{ => sink}/TestIcebergStreamWriter.java  |  10 +-
 .../flink/{ => sink}/TestRowDataPartitionKey.java  |   3 +-
 .../iceberg/flink/{ => sink}/TestTaskWriters.java  |   8 +-
 15 files changed, 1232 insertions(+), 128 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java 
b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
deleted file mode 100644
index 8d80342..0000000
--- a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.flink;
-
-import java.util.Locale;
-import java.util.Map;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-class IcebergSinkUtil {
-  private IcebergSinkUtil() {
-  }
-
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, 
TableSchema requestedSchema) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
-
-    RowType flinkSchema;
-    if (requestedSchema != null) {
-      // Convert the flink schema to iceberg schema firstly, then reassign ids 
to match the existing iceberg schema.
-      Schema writeSchema = 
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
-      TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
-
-      // We use this flink schema to read values from RowData. The flink's 
TINYINT and SMALLINT will be promoted to
-      // iceberg INTEGER, that means if we use iceberg's table schema to read 
TINYINT (backend by 1 'byte'), we will
-      // read 4 bytes rather than 1 byte, it will mess up the byte array in 
BinaryRowData. So here we must use flink
-      // schema.
-      flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
-    } else {
-      flinkSchema = FlinkSchemaUtil.convert(table.schema());
-    }
-
-    Map<String, String> props = table.properties();
-    long targetFileSize = getTargetFileSizeBytes(props);
-    FileFormat fileFormat = getFileFormat(props);
-
-    TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(table.schema(), flinkSchema,
-        table.spec(), table.locationProvider(), table.io(), 
table.encryption(), targetFileSize, fileFormat, props);
-
-    return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
-  }
-
-  private static FileFormat getFileFormat(Map<String, String> properties) {
-    String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
-    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
-  }
-
-  private static long getTargetFileSizeBytes(Map<String, String> properties) {
-    return PropertyUtil.propertyAsLong(properties,
-        WRITE_TARGET_FILE_SIZE_BYTES,
-        WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-  }
-}
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java 
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 1639a44..c91b659 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -51,7 +51,7 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
-class FlinkParquetWriters {
+public class FlinkParquetWriters {
   private FlinkParquetWriters() {
   }
 
@@ -207,6 +207,7 @@ class FlinkParquetWriters {
         " wrong precision %s", precision);
     return new IntegerDecimalWriter(desc, precision, scale);
   }
+
   private static ParquetValueWriters.PrimitiveWriter<DecimalData> 
decimalAsLong(ColumnDescriptor desc,
                                                                                
 int precision, int scale) {
     Preconditions.checkArgument(precision <= 18, "Cannot write decimal value 
as long with precision larger than 18, " +
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
new file mode 100644
index 0000000..96ce0ba
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+public class FlinkSink {
+
+  private static final String ICEBERG_STREAM_WRITER_NAME = 
IcebergStreamWriter.class.getSimpleName();
+  private static final String ICEBERG_FILES_COMMITTER_NAME = 
IcebergFilesCommitter.class.getSimpleName();
+
+  private FlinkSink() {
+  }
+
+  /**
+   * Initialize a {@link Builder} to export the data from generic input data 
stream into iceberg table. We use
+   * {@link RowData} inside the sink connector, so users need to provide a 
mapper function and a
+   * {@link TypeInformation} to convert those generic records to a RowData 
DataStream.
+   *
+   * @param input      the generic source input data stream.
+   * @param mapper     function to convert the generic data to {@link RowData}
+   * @param outputType to define the {@link TypeInformation} for the input 
data.
+   * @param <T>        the data type of records.
+   * @return {@link Builder} to connect the iceberg table.
+   */
+  public static <T> Builder builderFor(DataStream<T> input,
+                                       MapFunction<T, RowData> mapper,
+                                       TypeInformation<RowData> outputType) {
+    DataStream<RowData> dataStream = input.map(mapper, outputType);
+    return forRowData(dataStream);
+  }
+
+  /**
+   * Initialize a {@link Builder} to export the data from input data stream 
with {@link Row}s into iceberg table. We use
+   * {@link RowData} inside the sink connector, so users need to provide a 
{@link TableSchema} for builder to convert
+   * those {@link Row}s to a {@link RowData} DataStream.
+   *
+   * @param input       the source input data stream with {@link Row}s.
+   * @param tableSchema defines the {@link TypeInformation} for input data.
+   * @return {@link Builder} to connect the iceberg table.
+   */
+  public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) 
{
+    RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
+    DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
+
+    DataFormatConverters.RowConverter rowConverter = new 
DataFormatConverters.RowConverter(fieldDataTypes);
+    return builderFor(input, rowConverter::toInternal, 
RowDataTypeInfo.of(rowType))
+        .tableSchema(tableSchema);
+  }
+
+  /**
+   * Initialize a {@link Builder} to export the data from input data stream 
with {@link RowData}s into iceberg table.
+   *
+   * @param input the source input data stream with {@link RowData}s.
+   * @return {@link Builder} to connect the iceberg table.
+   */
+  public static Builder forRowData(DataStream<RowData> input) {
+    return new Builder().forRowData(input);
+  }
+
+  public static class Builder {
+    private DataStream<RowData> rowDataInput = null;
+    private TableLoader tableLoader;
+    private Configuration hadoopConf;
+    private Table table;
+    private TableSchema tableSchema;
+
+    private Builder() {
+    }
+
+    private Builder forRowData(DataStream<RowData> newRowDataInput) {
+      this.rowDataInput = newRowDataInput;
+      return this;
+    }
+
+    /**
+     * This iceberg {@link Table} instance is used for initializing {@link 
IcebergStreamWriter} which will write all
+     * the records into {@link DataFile}s and emit them to downstream 
operator. Providing a table would avoid so many
+     * table loading from each separate task.
+     *
+     * @param newTable the loaded iceberg table instance.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder table(Table newTable) {
+      this.table = newTable;
+      return this;
+    }
+
+    /**
+     * The table loader is used for loading tables in {@link 
IcebergFilesCommitter} lazily, we need this loader because
+     * {@link Table} is not serializable and could not just use the loaded 
table from Builder#table in the remote task
+     * manager.
+     *
+     * @param newTableLoader to load iceberg table inside tasks.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder tableLoader(TableLoader newTableLoader) {
+      this.tableLoader = newTableLoader;
+      return this;
+    }
+
+    public Builder hadoopConf(Configuration newHadoopConf) {
+      this.hadoopConf = newHadoopConf;
+      return this;
+    }
+
+    public Builder tableSchema(TableSchema newTableSchema) {
+      this.tableSchema = newTableSchema;
+      return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public DataStreamSink<RowData> build() {
+      Preconditions.checkArgument(rowDataInput != null,
+          "Please use forRowData() to initialize the input DataStream.");
+      Preconditions.checkNotNull(table, "Table shouldn't be null");
+      Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be 
null");
+      Preconditions.checkNotNull(hadoopConf, "Hadoop configuration shouldn't 
be null");
+
+      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
tableSchema);
+      IcebergFilesCommitter filesCommitter = new 
IcebergFilesCommitter(tableLoader, hadoopConf);
+
+      DataStream<Void> returnStream = rowDataInput
+          .transform(ICEBERG_STREAM_WRITER_NAME, 
TypeInformation.of(DataFile.class), streamWriter)
+          .setParallelism(rowDataInput.getParallelism())
+          .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      return returnStream.addSink(new DiscardingSink())
+          .name(String.format("IcebergSink %s", table.toString()))
+          .setParallelism(1);
+    }
+  }
+
+  static IcebergStreamWriter<RowData> createStreamWriter(Table table, 
TableSchema requestedSchema) {
+    Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
+
+    RowType flinkSchema;
+    if (requestedSchema != null) {
+      // Convert the flink schema to iceberg schema firstly, then reassign ids 
to match the existing iceberg schema.
+      Schema writeSchema = 
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
+      TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
+
+      // We use this flink schema to read values from RowData. The flink's 
TINYINT and SMALLINT will be promoted to
+      // iceberg INTEGER, that means if we use iceberg's table schema to read 
TINYINT (backend by 1 'byte'), we will
+      // read 4 bytes rather than 1 byte, it will mess up the byte array in 
BinaryRowData. So here we must use flink
+      // schema.
+      flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
+    } else {
+      flinkSchema = FlinkSchemaUtil.convert(table.schema());
+    }
+
+    Map<String, String> props = table.properties();
+    long targetFileSize = getTargetFileSizeBytes(props);
+    FileFormat fileFormat = getFileFormat(props);
+
+    TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(table.schema(), flinkSchema,
+        table.spec(), table.locationProvider(), table.io(), 
table.encryption(), targetFileSize, fileFormat, props);
+
+    return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
+  }
+
+  private static FileFormat getFileFormat(Map<String, String> properties) {
+    String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
+    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  private static long getTargetFileSizeBytes(Map<String, String> properties) {
+    return PropertyUtil.propertyAsLong(properties,
+        WRITE_TARGET_FILE_SIZE_BYTES,
+        WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
new file mode 100644
index 0000000..e0cfbaf
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IcebergFilesCommitter extends AbstractStreamOperator<Void>
+    implements OneInputStreamOperator<DataFile, Void>, BoundedOneInput {
+
+  private static final long serialVersionUID = 1L;
+  private static final long INITIAL_CHECKPOINT_ID = -1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergFilesCommitter.class);
+  private static final String FLINK_JOB_ID = "flink.job-id";
+
+  // The max checkpoint id we've committed to iceberg table. As the flink's 
checkpoint is always increasing, so we could
+  // correctly commit all the data files whose checkpoint id is greater than 
the max committed one to iceberg table, for
+  // avoiding committing the same data files twice. This id will be attached 
to iceberg's meta when committing the
+  // iceberg transaction.
+  private static final String MAX_COMMITTED_CHECKPOINT_ID = 
"flink.max-committed-checkpoint-id";
+
+  // TableLoader to load iceberg table lazily.
+  private final TableLoader tableLoader;
+  private final SerializableConfiguration hadoopConf;
+
+  // A sorted map to maintain the completed data files for each pending 
checkpointId (which have not been committed
+  // to iceberg table). We need a sorted map here because there's possible 
that few checkpoints snapshot failed, for
+  // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 
2st checkpoint have 1 data files
+  // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of 
network/disk failure etc, while we don't expect
+  // any data loss in iceberg table. So we keep the finished files <1, <file0, 
file1>> in memory and retry to commit
+  // iceberg table when the next checkpoint happen.
+  private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint = 
Maps.newTreeMap();
+
+  // The data files cache for current checkpoint. Once the snapshot barrier 
received, it will be flushed to the
+  // 'dataFilesPerCheckpoint'.
+  private final List<DataFile> dataFilesOfCurrentCheckpoint = 
Lists.newArrayList();
+
+  // It will have an unique identifier for one job.
+  private transient String flinkJobId;
+  private transient Table table;
+  private transient long maxCommittedCheckpointId;
+
+  // All pending checkpoints states for this function.
+  private static final ListStateDescriptor<SortedMap<Long, List<DataFile>>> 
STATE_DESCRIPTOR = buildStateDescriptor();
+  private transient ListState<SortedMap<Long, List<DataFile>>> 
checkpointsState;
+
+  IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) {
+    this.tableLoader = tableLoader;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    this.flinkJobId = 
getContainingTask().getEnvironment().getJobID().toString();
+
+    // Open the table loader and load the table.
+    this.tableLoader.open(hadoopConf.get());
+    this.table = tableLoader.loadTable();
+    this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+    this.checkpointsState = 
context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+    if (context.isRestored()) {
+      this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, 
flinkJobId);
+      // In the restoring path, it should have one valid snapshot for current 
flink job at least, so the max committed
+      // checkpoint id should be positive. If it's not positive, that means 
someone might have removed or expired the
+      // iceberg snapshot, in that case we should throw an exception in case 
of committing duplicated data files into
+      // the iceberg table.
+      Preconditions.checkState(maxCommittedCheckpointId != 
INITIAL_CHECKPOINT_ID,
+          "There should be an existing iceberg snapshot for current flink job: 
%s", flinkJobId);
+
+      SortedMap<Long, List<DataFile>> restoredDataFiles = 
checkpointsState.get().iterator().next();
+      // Only keep the uncommitted data files in the cache.
+      
this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId
 + 1));
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    long checkpointId = context.getCheckpointId();
+    LOG.info("Start to flush snapshot state to state backend, table: {}, 
checkpointId: {}", table, checkpointId);
+
+    // Update the checkpoint state.
+    dataFilesPerCheckpoint.put(checkpointId, 
ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+
+    // Reset the snapshot state to the latest state.
+    checkpointsState.clear();
+    checkpointsState.add(dataFilesPerCheckpoint);
+
+    // Clear the local buffer for current checkpoint.
+    dataFilesOfCurrentCheckpoint.clear();
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    super.notifyCheckpointComplete(checkpointId);
+    // It's possible that we have the following events:
+    //   1. snapshotState(ckpId);
+    //   2. snapshotState(ckpId+1);
+    //   3. notifyCheckpointComplete(ckpId+1);
+    //   4. notifyCheckpointComplete(ckpId);
+    // For step#4, we don't need to commit iceberg table again because in 
step#3 we've committed all the files,
+    // Besides, we need to maintain the max-committed-checkpoint-id to be 
increasing.
+    if (checkpointId > maxCommittedCheckpointId) {
+      commitUpToCheckpoint(checkpointId);
+      this.maxCommittedCheckpointId = checkpointId;
+    }
+  }
+
+  private void commitUpToCheckpoint(long checkpointId) {
+    NavigableMap<Long, List<DataFile>> pendingFileMap = 
dataFilesPerCheckpoint.headMap(checkpointId, true);
+
+    List<DataFile> pendingDataFiles = Lists.newArrayList();
+    for (List<DataFile> dataFiles : pendingFileMap.values()) {
+      pendingDataFiles.addAll(dataFiles);
+    }
+
+    AppendFiles appendFiles = table.newAppend();
+    pendingDataFiles.forEach(appendFiles::appendFile);
+    appendFiles.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
+    appendFiles.set(FLINK_JOB_ID, flinkJobId);
+    appendFiles.commit();
+
+    // Clear the committed data files from dataFilesPerCheckpoint.
+    pendingFileMap.clear();
+  }
+
+  @Override
+  public void processElement(StreamRecord<DataFile> element) {
+    this.dataFilesOfCurrentCheckpoint.add(element.getValue());
+  }
+
+  @Override
+  public void endInput() {
+    // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
+    dataFilesPerCheckpoint.put(Long.MAX_VALUE, 
ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+    dataFilesOfCurrentCheckpoint.clear();
+
+    commitUpToCheckpoint(Long.MAX_VALUE);
+  }
+
+  @Override
+  public void dispose() throws Exception {
+    if (tableLoader != null) {
+      tableLoader.close();
+    }
+  }
+
+  private static ListStateDescriptor<SortedMap<Long, List<DataFile>>> 
buildStateDescriptor() {
+    Comparator<Long> longComparator = 
Comparators.forType(Types.LongType.get());
+    // Construct a ListTypeInfo.
+    ListTypeInfo<DataFile> dataFileListTypeInfo = new 
ListTypeInfo<>(TypeInformation.of(DataFile.class));
+    // Construct a SortedMapTypeInfo.
+    SortedMapTypeInfo<Long, List<DataFile>> sortedMapTypeInfo = new 
SortedMapTypeInfo<>(
+        BasicTypeInfo.LONG_TYPE_INFO, dataFileListTypeInfo, longComparator
+    );
+    return new ListStateDescriptor<>("iceberg-files-committer-state", 
sortedMapTypeInfo);
+  }
+
+  static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+    Snapshot snapshot = table.currentSnapshot();
+    long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+    while (snapshot != null) {
+      Map<String, String> summary = snapshot.summary();
+      String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+      if (flinkJobId.equals(snapshotFlinkJobId)) {
+        String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+        if (value != null) {
+          lastCommittedCheckpointId = Long.parseLong(value);
+          break;
+        }
+      }
+      Long parentSnapshotId = snapshot.parentId();
+      snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : 
null;
+    }
+
+    return lastCommittedCheckpointId;
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
similarity index 94%
rename from 
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
rename to 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
index e18c293..95daa96 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.DataFile;
@@ -34,8 +35,8 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<DataFile>
   private static final long serialVersionUID = 1L;
 
   private final String fullTableName;
+  private final TaskWriterFactory<T> taskWriterFactory;
 
-  private transient TaskWriterFactory<T> taskWriterFactory;
   private transient TaskWriter<T> writer;
   private transient int subTaskId;
   private transient int attemptId;
@@ -43,6 +44,7 @@ class IcebergStreamWriter<T> extends 
AbstractStreamOperator<DataFile>
   IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> 
taskWriterFactory) {
     this.fullTableName = fullTableName;
     this.taskWriterFactory = taskWriterFactory;
+    setChainingStrategy(ChainingStrategy.ALWAYS);
   }
 
   @Override
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
similarity index 98%
rename from 
flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java
rename to 
flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
index 78c29c5..ad84697 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java
+++ 
b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
 import java.util.Map;
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
similarity index 80%
rename from 
flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
rename to 
flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index e8c5301..f50f432 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
+++ 
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.util.Map;
 import org.apache.flink.table.data.RowData;
@@ -33,6 +34,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.flink.data.FlinkAvroWriter;
 import org.apache.iceberg.flink.data.FlinkOrcWriter;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
@@ -42,9 +44,10 @@ import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
   private final Schema schema;
   private final RowType flinkSchema;
   private final PartitionSpec spec;
@@ -55,17 +58,17 @@ class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
   private final FileFormat format;
   private final FileAppenderFactory<RowData> appenderFactory;
 
-  private OutputFileFactory outputFileFactory;
-
-  RowDataTaskWriterFactory(Schema schema,
-                           RowType flinkSchema,
-                           PartitionSpec spec,
-                           LocationProvider locations,
-                           FileIO io,
-                           EncryptionManager encryptionManager,
-                           long targetFileSizeBytes,
-                           FileFormat format,
-                           Map<String, String> tableProperties) {
+  private transient OutputFileFactory outputFileFactory;
+
+  public RowDataTaskWriterFactory(Schema schema,
+                                  RowType flinkSchema,
+                                  PartitionSpec spec,
+                                  LocationProvider locations,
+                                  FileIO io,
+                                  EncryptionManager encryptionManager,
+                                  long targetFileSizeBytes,
+                                  FileFormat format,
+                                  Map<String, String> tableProperties) {
     this.schema = schema;
     this.flinkSchema = flinkSchema;
     this.spec = spec;
@@ -115,12 +118,12 @@ class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
     }
   }
 
-  private static class FlinkFileAppenderFactory implements 
FileAppenderFactory<RowData> {
+  public static class FlinkFileAppenderFactory implements 
FileAppenderFactory<RowData>, Serializable {
     private final Schema schema;
     private final RowType flinkSchema;
     private final Map<String, String> props;
 
-    private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, 
Map<String, String> props) {
+    public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema, 
Map<String, String> props) {
       this.schema = schema;
       this.flinkSchema = flinkSchema;
       this.props = props;
@@ -128,7 +131,6 @@ class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
 
     @Override
     public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat 
format) {
-      // TODO MetricsConfig will be used for building parquet RowData writer.
       MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
       try {
         switch (format) {
@@ -149,6 +151,14 @@ class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
                 .build();
 
           case PARQUET:
+            return Parquet.write(outputFile)
+                .createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(flinkSchema, msgType))
+                .setAll(props)
+                .metricsConfig(metricsConfig)
+                .schema(schema)
+                .overwrite()
+                .build();
+
           default:
             throw new UnsupportedOperationException("Cannot write unknown file 
format: " + format);
         }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
similarity index 99%
rename from flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
rename to flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
index cf0c09c..27e97ae 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
similarity index 97%
rename from flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
rename to 
flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
index c47da24..6ed7696 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.Serializable;
 import org.apache.iceberg.io.TaskWriter;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 82c2a89..b377e54 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -22,44 +22,58 @@ package org.apache.iceberg.flink;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+
 public class SimpleDataUtil {
 
   private SimpleDataUtil() {
   }
 
-  static final Schema SCHEMA = new Schema(
+  public static final Schema SCHEMA = new Schema(
       Types.NestedField.optional(1, "id", Types.IntegerType.get()),
       Types.NestedField.optional(2, "data", Types.StringType.get())
   );
 
-  static final TableSchema FLINK_SCHEMA = TableSchema.builder()
+  public static final TableSchema FLINK_SCHEMA = TableSchema.builder()
       .field("id", DataTypes.INT())
       .field("data", DataTypes.STRING())
       .build();
 
-  static final Record RECORD = GenericRecord.create(SCHEMA);
+  public static final RowType ROW_TYPE = (RowType) 
FLINK_SCHEMA.toRowDataType().getLogicalType();
+
+  public static final Record RECORD = GenericRecord.create(SCHEMA);
 
-  static Table createTable(String path, Map<String, String> properties, 
boolean partitioned) {
+  public static Table createTable(String path, Map<String, String> properties, 
boolean partitioned) {
     PartitionSpec spec;
     if (partitioned) {
       spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
@@ -69,34 +83,55 @@ public class SimpleDataUtil {
     return new HadoopTables().create(SCHEMA, spec, properties, path);
   }
 
-  static Record createRecord(Integer id, String data) {
+  public static Record createRecord(Integer id, String data) {
     Record record = RECORD.copy();
     record.setField("id", id);
     record.setField("data", data);
     return record;
   }
 
-  static RowData createRowData(Integer id, String data) {
+  public static RowData createRowData(Integer id, String data) {
     return GenericRowData.of(id, StringData.fromString(data));
   }
 
-  static void assertTableRows(String tablePath, List<RowData> rows) throws 
IOException {
-    List<Record> records = Lists.newArrayList();
-    for (RowData row : rows) {
+  public static DataFile writeFile(Schema schema, PartitionSpec spec, 
Configuration conf,
+                                   String location, String filename, 
List<RowData> rows)
+      throws IOException {
+    Path path = new Path(location, filename);
+    FileFormat fileFormat = FileFormat.fromFileName(filename);
+    Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: 
%s", filename);
+
+    RowType flinkSchema = FlinkSchemaUtil.convert(schema);
+    FileAppenderFactory<RowData> appenderFactory = new 
RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema,
+        flinkSchema, ImmutableMap.of());
+
+    FileAppender<RowData> appender = 
appenderFactory.newAppender(fromPath(path, conf), fileFormat);
+    try (FileAppender<RowData> closeableAppender = appender) {
+      closeableAppender.addAll(rows);
+    }
+
+    return DataFiles.builder(spec)
+        .withInputFile(HadoopInputFile.fromPath(path, conf))
+        .withMetrics(appender.metrics())
+        .build();
+  }
+
+  public static void assertTableRows(String tablePath, List<RowData> expected) 
throws IOException {
+    List<Record> expectedRecords = Lists.newArrayList();
+    for (RowData row : expected) {
       Integer id = row.isNullAt(0) ? null : row.getInt(0);
       String data = row.isNullAt(1) ? null : row.getString(1).toString();
-      records.add(createRecord(id, data));
+      expectedRecords.add(createRecord(id, data));
     }
-    assertTableRecords(tablePath, records);
+    assertTableRecords(tablePath, expectedRecords);
   }
 
-  static void assertTableRecords(String tablePath, List<Record> expected) 
throws IOException {
+  public static void assertTableRecords(String tablePath, List<Record> 
expected) throws IOException {
     Preconditions.checkArgument(expected != null, "expected records shouldn't 
be null");
     Table newTable = new HadoopTables().load(tablePath);
-    Set<Record> resultSet;
-    try (CloseableIterable<Record> iterable = (CloseableIterable<Record>) 
IcebergGenerics.read(newTable).build()) {
-      resultSet = Sets.newHashSet(iterable);
+    try (CloseableIterable<Record> iterable = 
IcebergGenerics.read(newTable).build()) {
+      Assert.assertEquals("Should produce the expected record",
+          Sets.newHashSet(expected), Sets.newHashSet(iterable));
     }
-    Assert.assertEquals("Should produce the expected record", resultSet, 
Sets.newHashSet(expected));
   }
 }
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
new file mode 100644
index 0000000..0aabfda
--- /dev/null
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSink extends AbstractTestBase {
+  private static final Configuration CONF = new Configuration();
+  private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(
+      SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER = new 
DataFormatConverters.RowConverter(
+      SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private String tablePath;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned 
= {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro", 1, true},
+        new Object[] {"avro", 1, false},
+        new Object[] {"avro", 2, true},
+        new Object[] {"avro", 2, false},
+        new Object[] {"orc", 1, true},
+        new Object[] {"orc", 1, false},
+        new Object[] {"orc", 2, true},
+        new Object[] {"orc", 2, false},
+        new Object[] {"parquet", 1, true},
+        new Object[] {"parquet", 1, false},
+        new Object[] {"parquet", 2, true},
+        new Object[] {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkIcebergSink(String format, int parallelism, boolean 
partitioned) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = tempFolder.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    tablePath = warehouse.concat("/test");
+    Assert.assertTrue("Should create the table path correctly.", new 
File(tablePath).mkdir());
+
+    Map<String, String> props = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .enableCheckpointing(100)
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    tableLoader = TableLoader.fromHadoopTable(tablePath);
+  }
+
+  private List<RowData> convertToRowData(List<Row> rows) {
+    return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+
+  @Test
+  public void testWriteRowData() throws Exception {
+    List<Row> rows = Lists.newArrayList(
+        Row.of(1, "hello"),
+        Row.of(2, "world"),
+        Row.of(3, "foo")
+    );
+    DataStream<RowData> dataStream = env.addSource(new 
FiniteTestSource<>(rows), ROW_TYPE_INFO)
+        .map(CONVERTER::toInternal, 
RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE));
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .hadoopConf(CONF)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream");
+
+    // Assert the iceberg table's records. NOTICE: the FiniteTestSource will 
checkpoint the same rows twice, so it will
+    // commit the same row list into iceberg twice.
+    List<RowData> expectedRows = 
Lists.newArrayList(Iterables.concat(convertToRowData(rows), 
convertToRowData(rows)));
+    SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+  }
+
+  private void testWriteRow(TableSchema tableSchema) throws Exception {
+    List<Row> rows = Lists.newArrayList(
+        Row.of(4, "bar"),
+        Row.of(5, "apache")
+    );
+    DataStream<Row> dataStream = env.addSource(new FiniteTestSource<>(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .hadoopConf(CONF)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    List<RowData> expectedRows = 
Lists.newArrayList(Iterables.concat(convertToRowData(rows), 
convertToRowData(rows)));
+    SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA);
+  }
+}
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
new file mode 100644
index 0000000..e5a9a8d
--- /dev/null
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergFilesCommitter {
+  private static final Configuration CONF = new Configuration();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private String tablePath;
+  private Table table;
+
+  private final FileFormat format;
+
+  @Parameterized.Parameters(name = "format = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] {"avro"},
+        new Object[] {"orc"},
+        new Object[] {"parquet"}
+    };
+  }
+
+  public TestIcebergFilesCommitter(String format) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = tempFolder.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    tablePath = warehouse.concat("/test");
+    Assert.assertTrue("Should create the table directory correctly.", new 
File(tablePath).mkdir());
+
+    // Construct the iceberg table.
+    Map<String, String> props = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    table = SimpleDataUtil.createTable(tablePath, props, false);
+  }
+
+  @Test
+  public void testCommitTxnWithoutDataFiles() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      SimpleDataUtil.assertTableRows(tablePath, Lists.newArrayList());
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // It's better to advance the max-committed-checkpoint-id in iceberg 
snapshot, so that the future flink job
+      // failover won't fail.
+      for (int i = 1; i <= 3; i++) {
+        harness.snapshot(++checkpointId, ++timestamp);
+        harness.notifyOfCompletedCheckpoint(checkpointId);
+        assertSnapshotSize(i);
+        assertMaxCommittedCheckpointId(jobId, checkpointId);
+      }
+    }
+  }
+
+  @Test
+  public void testCommitTxn() throws Exception {
+    // Test with 3 continues checkpoints:
+    //   1. snapshotState for checkpoint#1
+    //   2. notifyCheckpointComplete for checkpoint#1
+    //   3. snapshotState for checkpoint#2
+    //   4. notifyCheckpointComplete for checkpoint#2
+    //   5. snapshotState for checkpoint#3
+    //   6. notifyCheckpointComplete for checkpoint#3
+    long timestamp = 0;
+
+    JobID jobID = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobID)) {
+      harness.setup();
+      harness.open();
+      assertSnapshotSize(0);
+
+      List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+      for (int i = 1; i <= 3; i++) {
+        RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i);
+        DataFile dataFile = writeDataFile("data-" + i, 
ImmutableList.of(rowData));
+        harness.processElement(dataFile, ++timestamp);
+        rows.add(rowData);
+
+        harness.snapshot(i, ++timestamp);
+
+        harness.notifyOfCompletedCheckpoint(i);
+
+        SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows));
+        assertSnapshotSize(i);
+        assertMaxCommittedCheckpointId(jobID, i);
+      }
+    }
+  }
+
+  @Test
+  public void testOrderedEventsBetweenCheckpoints() throws Exception {
+    // It's possible that two checkpoints happen in the following orders:
+    //   1. snapshotState for checkpoint#1;
+    //   2. snapshotState for checkpoint#2;
+    //   3. notifyCheckpointComplete for checkpoint#1;
+    //   4. notifyCheckpointComplete for checkpoint#2;
+    long timestamp = 0;
+
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+      DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+      harness.processElement(dataFile1, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // 1. snapshotState for checkpoint#1
+      long firstCheckpointId = 1;
+      harness.snapshot(firstCheckpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(2, "world");
+      DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+      harness.processElement(dataFile2, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // 2. snapshotState for checkpoint#2
+      long secondCheckpointId = 2;
+      harness.snapshot(secondCheckpointId, ++timestamp);
+
+      // 3. notifyCheckpointComplete for checkpoint#1
+      harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
+      assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+
+      // 4. notifyCheckpointComplete for checkpoint#2
+      harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+    }
+  }
+
+  @Test
+  public void testDisorderedEventsBetweenCheckpoints() throws Exception {
+    // It's possible that the two checkpoints happen in the following orders:
+    //   1. snapshotState for checkpoint#1;
+    //   2. snapshotState for checkpoint#2;
+    //   3. notifyCheckpointComplete for checkpoint#2;
+    //   4. notifyCheckpointComplete for checkpoint#1;
+    long timestamp = 0;
+
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+      DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+      harness.processElement(dataFile1, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // 1. snapshotState for checkpoint#1
+      long firstCheckpointId = 1;
+      harness.snapshot(firstCheckpointId, ++timestamp);
+
+      RowData row2 = SimpleDataUtil.createRowData(2, "world");
+      DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+      harness.processElement(dataFile2, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      // 2. snapshotState for checkpoint#2
+      long secondCheckpointId = 2;
+      harness.snapshot(secondCheckpointId, ++timestamp);
+
+      // 3. notifyCheckpointComplete for checkpoint#2
+      harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+
+      // 4. notifyCheckpointComplete for checkpoint#1
+      harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+      assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+    }
+  }
+
+  @Test
+  public void testRecoveryFromInvalidSnapshot() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    OperatorSubtaskState snapshot;
+
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      RowData row = SimpleDataUtil.createRowData(1, "hello");
+      DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row));
+
+      harness.processElement(dataFile, ++timestamp);
+      snapshot = harness.snapshot(++checkpointId, ++timestamp);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of());
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      AssertHelpers.assertThrows("Could not restore because there's no valid 
snapshot.",
+          IllegalStateException.class,
+          "There should be an existing iceberg snapshot for current flink job",
+          () -> {
+            harness.initializeState(snapshot);
+            return null;
+          });
+    }
+  }
+
+  @Test
+  public void testRecoveryFromValidSnapshot() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    List<RowData> expectedRows = Lists.newArrayList();
+    OperatorSubtaskState snapshot;
+
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      RowData row = SimpleDataUtil.createRowData(1, "hello");
+      expectedRows.add(row);
+      DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row));
+
+      harness.processElement(dataFile1, ++timestamp);
+      snapshot = harness.snapshot(++checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+      SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row));
+      assertSnapshotSize(1);
+      assertMaxCommittedCheckpointId(jobId, checkpointId);
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.initializeState(snapshot);
+      harness.open();
+
+      SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+      assertSnapshotSize(1);
+      assertMaxCommittedCheckpointId(jobId, checkpointId);
+
+      RowData row = SimpleDataUtil.createRowData(2, "world");
+      expectedRows.add(row);
+      DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row));
+      harness.processElement(dataFile, ++timestamp);
+
+      harness.snapshot(++checkpointId, ++timestamp);
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+      SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+      assertSnapshotSize(2);
+      assertMaxCommittedCheckpointId(jobId, checkpointId);
+    }
+  }
+
+  @Test
+  public void testStartAnotherJobToWriteSameTable() throws Exception {
+    long checkpointId = 0;
+    long timestamp = 0;
+    List<RowData> rows = Lists.newArrayList();
+    List<RowData> tableRows = Lists.newArrayList();
+
+    JobID oldJobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(oldJobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(oldJobId, -1L);
+
+      for (int i = 1; i <= 3; i++) {
+        rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
+        tableRows.addAll(rows);
+
+        DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
+        harness.processElement(dataFile, ++timestamp);
+        harness.snapshot(++checkpointId, ++timestamp);
+
+        harness.notifyOfCompletedCheckpoint(checkpointId);
+        SimpleDataUtil.assertTableRows(tablePath, tableRows);
+        assertSnapshotSize(i);
+        assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+      }
+    }
+
+    // The new started job will start with checkpoint = 1 again.
+    checkpointId = 0;
+    timestamp = 0;
+    JobID newJobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(newJobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(3);
+      assertMaxCommittedCheckpointId(oldJobId, 3);
+      assertMaxCommittedCheckpointId(newJobId, -1);
+
+      rows.add(SimpleDataUtil.createRowData(2, "world"));
+      tableRows.addAll(rows);
+
+      DataFile dataFile = writeDataFile("data-new-1", rows);
+      harness.processElement(dataFile, ++timestamp);
+      harness.snapshot(++checkpointId, ++timestamp);
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+      SimpleDataUtil.assertTableRows(tablePath, tableRows);
+      assertSnapshotSize(4);
+      assertMaxCommittedCheckpointId(newJobId, checkpointId);
+    }
+  }
+
+  @Test
+  public void testMultipleJobsWriteSameTable() throws Exception {
+    long timestamp = 0;
+    List<RowData> tableRows = Lists.newArrayList();
+
+    JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+    for (int i = 0; i < 20; i++) {
+      int jobIndex = i % 3;
+      int checkpointId = i / 3;
+      JobID jobId = jobs[jobIndex];
+      try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+        harness.setup();
+        harness.open();
+
+        assertSnapshotSize(i);
+        assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 : 
checkpointId);
+
+        List<RowData> rows = 
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
+        tableRows.addAll(rows);
+
+        DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
+        harness.processElement(dataFile, ++timestamp);
+        harness.snapshot(checkpointId + 1, ++timestamp);
+
+        harness.notifyOfCompletedCheckpoint(checkpointId + 1);
+        SimpleDataUtil.assertTableRows(tablePath, tableRows);
+        assertSnapshotSize(i + 1);
+        assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+      }
+    }
+  }
+
+  @Test
+  public void testBoundedStream() throws Exception {
+    JobID jobId = new JobID();
+    try (OneInputStreamOperatorTestHarness<DataFile, Void> harness = 
createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+
+      assertSnapshotSize(0);
+      assertMaxCommittedCheckpointId(jobId, -1L);
+
+      List<RowData> tableRows = 
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
+
+      DataFile dataFile = writeDataFile("data-1", tableRows);
+      harness.processElement(dataFile, 1);
+      ((BoundedOneInput) harness.getOneInputOperator()).endInput();
+
+      SimpleDataUtil.assertTableRows(tablePath, tableRows);
+      assertSnapshotSize(1);
+      assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+    }
+  }
+
+  private DataFile writeDataFile(String filename, List<RowData> rows) throws 
IOException {
+    return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF, 
tablePath, format.addExtension(filename), rows);
+  }
+
+  private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+    table.refresh();
+    long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table, 
jobID.toString());
+    Assert.assertEquals(expectedId, actualId);
+  }
+
+  private void assertSnapshotSize(int expectedSnapshotSize) {
+    table.refresh();
+    Assert.assertEquals(expectedSnapshotSize, 
Lists.newArrayList(table.snapshots()).size());
+  }
+
+  private OneInputStreamOperatorTestHarness<DataFile, Void> 
createStreamSink(JobID jobID)
+      throws Exception {
+    TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+    return new OneInputStreamOperatorTestHarness<>(factory, 
createEnvironment(jobID));
+  }
+
+  private static MockEnvironment createEnvironment(JobID jobID) {
+    return new MockEnvironmentBuilder()
+        .setTaskName("test task")
+        .setManagedMemorySize(32 * 1024)
+        .setInputSplitProvider(new MockInputSplitProvider())
+        .setBufferSize(256)
+        .setTaskConfiguration(new 
org.apache.flink.configuration.Configuration())
+        .setExecutionConfig(new ExecutionConfig())
+        .setMaxParallelism(16)
+        .setJobID(jobID)
+        .build();
+  }
+
+  private static class TestOperatorFactory extends 
AbstractStreamOperatorFactory<Void>
+      implements OneInputStreamOperatorFactory<DataFile, Void> {
+    private final String tablePath;
+
+    private TestOperatorFactory(String tablePath) {
+      this.tablePath = tablePath;
+    }
+
+    private static TestOperatorFactory of(String tablePath) {
+      return new TestOperatorFactory(tablePath);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends StreamOperator<Void>> T 
createStreamOperator(StreamOperatorParameters<Void> param) {
+      IcebergFilesCommitter committer = new 
IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), CONF);
+      committer.setup(param.getContainingTask(), param.getStreamConfig(), 
param.getOutput());
+      return (T) committer;
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
+      return IcebergFilesCommitter.class;
+    }
+  }
+}
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
similarity index 97%
rename from 
flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
rename to 
flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index c6eee46..f4e10d5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.File;
 import java.io.IOException;
@@ -45,6 +45,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -70,14 +71,15 @@ public class TestIcebergStreamWriter {
   private final FileFormat format;
   private final boolean partitioned;
 
-  // TODO add Parquet unit test once the readers and writers are ready.
   @Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
   public static Object[][] parameters() {
     return new Object[][] {
         new Object[] {"avro", true},
         new Object[] {"avro", false},
         new Object[] {"orc", true},
-        new Object[] {"orc", false}
+        new Object[] {"orc", false},
+        new Object[] {"parquet", true},
+        new Object[] {"parquet", false}
     };
   }
 
@@ -315,7 +317,7 @@ public class TestIcebergStreamWriter {
 
   private OneInputStreamOperatorTestHarness<RowData, DataFile> 
createIcebergStreamWriter(
       Table icebergTable, TableSchema flinkSchema) throws Exception {
-    IcebergStreamWriter<RowData> streamWriter = 
IcebergSinkUtil.createStreamWriter(icebergTable, flinkSchema);
+    IcebergStreamWriter<RowData> streamWriter = 
FlinkSink.createStreamWriter(icebergTable, flinkSchema);
     OneInputStreamOperatorTestHarness<RowData, DataFile> harness = new 
OneInputStreamOperatorTestHarness<>(
         streamWriter, 1, 1, 0);
 
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
similarity index 99%
rename from 
flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java
rename to 
flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
index 22a8654..d8c0de0 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,6 +31,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.data.RandomRowData;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java 
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
similarity index 97%
rename from flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java
rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index bb3841e..c4c6979 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
 
 import java.io.File;
 import java.io.IOException;
@@ -35,6 +35,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.data.RandomRowData;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -55,14 +56,15 @@ public class TestTaskWriters {
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
-  // TODO add Parquet unit test once the readers and writers are ready.
   @Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
   public static Object[][] parameters() {
     return new Object[][] {
         new Object[] {"avro", true},
         new Object[] {"avro", false},
         new Object[] {"orc", true},
-        new Object[] {"orc", false}
+        new Object[] {"orc", false},
+        new Object[] {"parquet", true},
+        new Object[] {"parquet", false}
     };
   }
 

Reply via email to