openinx commented on a change in pull request #1145:
URL: https://github.com/apache/iceberg/pull/1145#discussion_r449967218



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());

Review comment:
       Fine, let's keep the rule.

##########
File path: core/src/main/java/org/apache/iceberg/BaseFile.java
##########
@@ -360,7 +360,7 @@ public ByteBuffer keyMetadata() {
     if (list != null) {
       List<E> copy = Lists.newArrayListWithExpectedSize(list.size());
       copy.addAll(list);
-      return Collections.unmodifiableList(copy);

Review comment:
       OK,  I did know theres' a way to specify the customized serializer.  Let 
me check how to handle this. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {
+      emit(dataFile);
+    }
+    // Remember to clear the writer's cached complete files.
+    writer.reset();
+    LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint 
{}", tablePath, subTaskId, checkpointId);
+  }
+
+  @Override
+  public void processElement(StreamRecord<Row> element) throws Exception {
+    Row value = element.getValue();
+    writer.append(value);
+
+    // Emit the data file entries to downstream committer operator if there 
exist any complete files.
+    List<DataFile> completeFiles = writer.getCompleteFiles();
+    if (!completeFiles.isEmpty()) {
+      completeFiles.forEach(this::emit);
+      // Remember to clear the writer's cached complete files.
+      writer.reset();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+
+  private void emit(DataFile dataFile) {
+    output.collect(new StreamRecord<>(new SerializableDataFile(dataFile)));
+  }
+
+  private FileFormat getFileFormat() {

Review comment:
       Here we don't create a file format, just parse the  `FileFormat`  from 
properties. the `get` sounds good to me. 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+
+  protected DataFile closeFileAppender(FileAppender<T> fileAppender, 
EncryptedOutputFile currentFile,
+                                       PartitionSpec spec, StructLike 
partitionKey) throws IOException {
+    // Close the file appender firstly.
+    fileAppender.close();
+
+    // metrics are only valid after the appender is closed.
+    Metrics metrics = fileAppender.metrics();
+    long fileSizeInBytes = fileAppender.length();
+    List<Long> splitOffsets = fileAppender.splitOffsets();
+
+    // Construct the DataFile and add it into the completeDataFiles.

Review comment:
       It's true,  let remove it.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());

Review comment:
       Because the `readSchema` was converted  from user-provided flink table 
schema,  its field id(s) is not matching the written iceberg table's. Then the 
following `validateWriteSchema` will regard those two tables as incompatible 
because of the diff id. for example: 
   
   ```
   Cannot write incompatible dataset to table with schema:
   table {
     0: id: optional int
     1: data: optional string
   }
   write schema:table {
     1: id: optional int
     2: data: optional string
   }
   Problems:
   * data: int cannot be promoted to string
   java.lang.IllegalArgumentException: Cannot write incompatible dataset to 
table with schema:
   table {
     0: id: optional int
     1: data: optional string
   }
   write schema:table {
     1: id: optional int
     2: data: optional string
   }
   Problems:
   * data: int cannot be promoted to string
        at 
org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:216)
        at 
org.apache.iceberg.flink.IcebergStreamWriter.open(IcebergStreamWriter.java:97)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:558)
        at 
org.apache.iceberg.flink.TestIcebergStreamWriter.testWritingTable(TestIcebergStreamWriter.java:94)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at 
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
        at java.lang.Thread.run(Thread.java:748)
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}

Review comment:
       Yes,  It's a code comment and  we expect that the `IcebergStreamWriter`  
won't be exposed to the upper layer  iceberg users,  because we will have a 
wrapper to hidden the implementation details of `IcebergStreamWriter` and 
`IcebergFilesCommitter` and only expose the `DataStream<Row>` to the end user.  
I can remove this comment if you guys think it's OK.  NOTICE:  there will be a 
next patch to implement the `IcebergFilesCommitter` to collect all the data 
files emitted by `IcebergStreamWriter` and commit the iceberg transaction in 
one parallelism.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {
+      emit(dataFile);
+    }
+    // Remember to clear the writer's cached complete files.
+    writer.reset();
+    LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint 
{}", tablePath, subTaskId, checkpointId);
+  }
+
+  @Override
+  public void processElement(StreamRecord<Row> element) throws Exception {
+    Row value = element.getValue();
+    writer.append(value);
+
+    // Emit the data file entries to downstream committer operator if there 
exist any complete files.
+    List<DataFile> completeFiles = writer.getCompleteFiles();
+    if (!completeFiles.isEmpty()) {
+      completeFiles.forEach(this::emit);
+      // Remember to clear the writer's cached complete files.
+      writer.reset();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+
+  private void emit(DataFile dataFile) {
+    output.collect(new StreamRecord<>(new SerializableDataFile(dataFile)));
+  }
+
+  private FileFormat getFileFormat() {

Review comment:
       Well, let me consider about this.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {

Review comment:
       Well, sounds good to me.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/PartitionKey.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Types;
+
+public class PartitionKey implements StructLike {
+
+  private final Object[] partitionTuple;
+
+  private PartitionKey(Object[] partitionTuple) {
+    this.partitionTuple = partitionTuple;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof PartitionKey)) {
+      return false;
+    }
+
+    PartitionKey that = (PartitionKey) o;
+    return Arrays.equals(partitionTuple, that.partitionTuple);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(partitionTuple);
+  }
+
+  @Override
+  public int size() {
+    return partitionTuple.length;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(partitionTuple[pos]);
+  }
+
+  public Object[] getPartitionTuple() {
+    return partitionTuple;
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    partitionTuple[pos] = value;
+  }
+
+  private static Map<Integer, Integer> buildFieldId2PosMap(Schema schema) {
+    Map<Integer, Integer> fieldId2Position = Maps.newHashMap();
+    List<Types.NestedField> nestedFields = schema.asStruct().fields();
+    for (int i = 0; i < nestedFields.size(); i++) {
+      fieldId2Position.put(nestedFields.get(i).fieldId(), i);
+    }
+    return fieldId2Position;
+  }
+
+  public static Builder builder(PartitionSpec spec) {
+    return new Builder(spec);
+  }
+
+  public static class Builder {
+    private final int size;
+
+    private final int[] pos;
+    private final Transform[] transforms;
+
+    private Builder(PartitionSpec spec) {
+      List<PartitionField> fields = spec.fields();
+      this.size = fields.size();
+      this.pos = new int[size];
+      this.transforms = new Transform[size];
+
+      Map<Integer, Integer> fieldId2Pos = buildFieldId2PosMap(spec.schema());

Review comment:
       In my thought, the  `PartitionSpec`  will only use the root-level fields 
so I simplified the `accessor` to `buildFieldId2PosMap`... I'm not quite sure 
whether we need the complex tree-traverse,  let me take a deeper look..

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {
+      emit(dataFile);
+    }
+    // Remember to clear the writer's cached complete files.
+    writer.reset();
+    LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint 
{}", tablePath, subTaskId, checkpointId);
+  }
+
+  @Override
+  public void processElement(StreamRecord<Row> element) throws Exception {
+    Row value = element.getValue();
+    writer.append(value);
+
+    // Emit the data file entries to downstream committer operator if there 
exist any complete files.
+    List<DataFile> completeFiles = writer.getCompleteFiles();
+    if (!completeFiles.isEmpty()) {
+      completeFiles.forEach(this::emit);
+      // Remember to clear the writer's cached complete files.
+      writer.reset();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      writer.close();

Review comment:
       @rdblue ,  I guess the real question you are curious is:   here we close 
the writer without emitting the data files to downstream, then will it break 
the `exactly-once` semantic ?  Let's take an example, we have the record 
stream: 
   
   ```
   1 2 3 4  5 6 7 8 9 10 11 12  ...
   ``` 
   
   and the emitted data file list would be: 
   
   ```
   datafile0:   1,2,3
   datafile1:   4,5,6
   datafile2:   7,8,9
   datafile3:   10,11,12
   ```
   Assuming that the checkpoint happen between  6 and 7, then we will emit the 
`datafile0` and `datafile1` to downstream operator.   If we close the writer 
(exceptionally or intentionally) after record 11, then we will emit the 
`datafile2`  to downstream operator, but will ignore the opening `datafile3`.  
It actually don't break the `exactly-once` semantics  because the snapshot 
think that the record `[1,6]` has been processed by `IcebergStreamWriter` and 
snapshot state of the downstream operator contains the data file `datafile0, 
datafile1`.  Once recovered,  we will replay the records 7, 8, 9, 10, 11, 
12..., the datafile-2 and datafile-3 will be re-generated and re-emitted to 
downstream operator, so here we don't need to emit the data file to downstream 
operator when closing. Besides, even if we emit the `datafile-3` to downstream 
operator in the `close` method, it will be discarded when recovering. 
   
   (I provided a unit test to address  this case).
   
   

##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableUtil.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+
+class TableUtil {
+
+  private TableUtil() {
+  }
+
+  static Table findTable(String path, Configuration conf) {

Review comment:
       @JingsongLi seems we could work those things together.  I will focus on 
the streaming writer and keep the simple `findTable` here. and you could 
provide a pull request in `buildIcebergCatalog` way to integrate iceberg 
catalog to flink catalog (if you want).

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+
+  protected DataFile closeFileAppender(FileAppender<T> fileAppender, 
EncryptedOutputFile currentFile,
+                                       PartitionSpec spec, StructLike 
partitionKey) throws IOException {
+    // Close the file appender firstly.
+    fileAppender.close();
+
+    // metrics are only valid after the appender is closed.
+    Metrics metrics = fileAppender.metrics();
+    long fileSizeInBytes = fileAppender.length();
+    List<Long> splitOffsets = fileAppender.splitOffsets();
+
+    // Construct the DataFile and add it into the completeDataFiles.
+    return DataFiles.builder(spec)
+        .withEncryptedOutputFile(currentFile)
+        .withPath(currentFile.encryptingOutputFile().location())

Review comment:
       OK, thanks for the reminding. 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+
+  protected DataFile closeFileAppender(FileAppender<T> fileAppender, 
EncryptedOutputFile currentFile,
+                                       PartitionSpec spec, StructLike 
partitionKey) throws IOException {
+    // Close the file appender firstly.
+    fileAppender.close();
+
+    // metrics are only valid after the appender is closed.
+    Metrics metrics = fileAppender.metrics();
+    long fileSizeInBytes = fileAppender.length();
+    List<Long> splitOffsets = fileAppender.splitOffsets();
+
+    // Construct the DataFile and add it into the completeDataFiles.
+    return DataFiles.builder(spec)

Review comment:
       We actually have skipped the empty `DataFile` construction in the upper 
layer,   you can see the `PartitionWriter` and `UnpartitionedWriter` (we also 
have an unit test to address it). So no need to check it here anymore. 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+
+  protected DataFile closeFileAppender(FileAppender<T> fileAppender, 
EncryptedOutputFile currentFile,
+                                       PartitionSpec spec, StructLike 
partitionKey) throws IOException {
+    // Close the file appender firstly.
+    fileAppender.close();
+
+    // metrics are only valid after the appender is closed.
+    Metrics metrics = fileAppender.metrics();
+    long fileSizeInBytes = fileAppender.length();
+    List<Long> splitOffsets = fileAppender.splitOffsets();
+
+    // Construct the DataFile and add it into the completeDataFiles.
+    return DataFiles.builder(spec)
+        .withEncryptedOutputFile(currentFile)
+        .withPath(currentFile.encryptingOutputFile().location())
+        .withFileSizeInBytes(fileSizeInBytes)
+        .withPartition(spec.fields().size() == 0 ? null : partitionKey)

Review comment:
       It is not necessary to keep the `partitionKey` reference when building 
DataFiles.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to