JingsongLi commented on a change in pull request #1145:
URL: https://github.com/apache/iceberg/pull/1145#discussion_r449359952



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}

Review comment:
       Operator in Flink will be serialized into `byte[]` whatever. So I think 
it is true about `set all the non-serializable members to be null`. You don't 
need add this comment.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {

Review comment:
       I can see always a `reset` after `getCompleteFiles`. Can just provides a 
`pollCompleteFiles`?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.flink.writer.FileAppenderFactory;
+import org.apache.iceberg.flink.writer.OutputFileFactory;
+import org.apache.iceberg.flink.writer.TaskWriter;
+import org.apache.iceberg.flink.writer.TaskWriterFactory;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile>
+    implements OneInputStreamOperator<Row, SerializableDataFile> {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriter.class);
+
+  private final String tablePath;
+  private final SerializableConfiguration conf;
+  private Schema readSchema;
+
+  private transient Table table;
+  private transient TaskWriter<Row> writer;
+  private transient int subTaskId;
+
+  /**
+   * Be careful to do the initialization in this constructor, because in 
{@link DataStream#addSink(SinkFunction)}
+   * it will call {@link ClosureCleaner#clean(Object, 
ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the
+   * non-serializable members to be null.
+   *
+   * @param tablePath  The base path of the iceberg table.
+   * @param readSchema The schema of source data.
+   * @param conf       The hadoop's configuration.
+   */
+  private IcebergStreamWriter(String tablePath, Schema readSchema, 
Configuration conf) {
+    this.tablePath = tablePath;
+    this.conf = new SerializableConfiguration(conf);
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public void open() {
+    this.table = TableUtil.findTable(tablePath, conf.get());
+    if (this.readSchema != null) {
+      // reassign ids to match the existing table schema
+      readSchema = TypeUtil.reassignIds(readSchema, table.schema());
+      TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true);
+    }
+
+    this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+
+    // Initialize the task writer.
+    FileFormat fileFormat = getFileFormat();
+    FileAppenderFactory<Row> appenderFactory = new 
FlinkFileAppenderFactory(table);
+    OutputFileFactory outputFileFactory = new OutputFileFactory(table, 
fileFormat, subTaskId);
+    this.writer = TaskWriterFactory.createTaskWriter(table.spec(),
+        appenderFactory,
+        outputFileFactory,
+        getTargetFileSizeBytes(),
+        fileFormat);
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", 
tablePath, subTaskId, checkpointId);
+    // close all open files and emit files to downstream committer operator
+    writer.close();
+    for (DataFile dataFile : writer.getCompleteFiles()) {
+      emit(dataFile);
+    }
+    // Remember to clear the writer's cached complete files.
+    writer.reset();
+    LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint 
{}", tablePath, subTaskId, checkpointId);
+  }
+
+  @Override
+  public void processElement(StreamRecord<Row> element) throws Exception {
+    Row value = element.getValue();
+    writer.append(value);
+
+    // Emit the data file entries to downstream committer operator if there 
exist any complete files.
+    List<DataFile> completeFiles = writer.getCompleteFiles();
+    if (!completeFiles.isEmpty()) {
+      completeFiles.forEach(this::emit);
+      // Remember to clear the writer's cached complete files.
+      writer.reset();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+
+  private void emit(DataFile dataFile) {
+    output.collect(new StreamRecord<>(new SerializableDataFile(dataFile)));
+  }
+
+  private FileFormat getFileFormat() {

Review comment:
       `createFileFormat `?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.PartitionKey;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The PartitionFanoutWriter will open a writing data file for each partition 
and route the given record to the
+ * corresponding data file in the correct partition.
+ *
+ * @param <T> defines the data type of record to write.
+ */
+class PartitionWriter<T> extends BaseTaskWriter<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionWriter.class);
+
+  private final PartitionSpec spec;
+  private final FileAppenderFactory<T> factory;
+  private final Function<PartitionKey, EncryptedOutputFile> outputFileGetter;
+  private final Function<T, PartitionKey> keyGetter;
+  private final Map<PartitionKey, WrappedFileAppender<T>> writers;
+  private final long targetFileSize;
+  private final FileFormat fileFormat;
+  private final List<DataFile> completeDataFiles;
+
+  PartitionWriter(PartitionSpec spec,
+                  FileAppenderFactory<T> factory,
+                  Function<PartitionKey, EncryptedOutputFile> outputFileGetter,
+                  Function<T, PartitionKey> keyGetter,
+                  long targetFileSize,
+                  FileFormat fileFormat) {
+    this.spec = spec;
+    this.factory = factory;
+    this.outputFileGetter = outputFileGetter;
+    this.keyGetter = keyGetter;
+    this.writers = Maps.newHashMap();
+    this.targetFileSize = targetFileSize;
+    this.fileFormat = fileFormat;
+    this.completeDataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  public void append(T record) throws IOException {
+    PartitionKey partitionKey = keyGetter.apply(record);
+    Preconditions.checkArgument(partitionKey != null, "Partition key shouldn't 
be null");
+
+    WrappedFileAppender<T> writer = writers.get(partitionKey);
+    if (writer == null) {
+      writer = createWrappedFileAppender(partitionKey);
+      writers.put(partitionKey, writer);
+    }
+    writer.fileAppender.add(record);
+
+    // Roll the writer if reach the target file size.
+    writer.currentRows++;
+    if (writer.currentRows % ROW_DIVISOR == 0 && writer.fileAppender.length() 
>= targetFileSize) {
+      closeCurrentWriter(writer);
+      writers.remove(partitionKey);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (WrappedFileAppender<T> wrap : writers.values()) {
+      closeCurrentWriter(wrap);
+      LOG.debug("Close file appender: {}, completeDataFiles: {}",
+          wrap.encryptedOutputFile.encryptingOutputFile().location(),
+          completeDataFiles.size());
+    }
+    this.writers.clear();
+  }
+
+  @Override
+  public List<DataFile> getCompleteFiles() {
+    if (completeDataFiles.size() > 0) {
+      return ImmutableList.copyOf(this.completeDataFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  @Override
+  public void reset() {
+    this.completeDataFiles.clear();
+  }
+
+  private WrappedFileAppender<T> createWrappedFileAppender(PartitionKey 
partitionKey) {
+    EncryptedOutputFile outputFile = outputFileGetter.apply(partitionKey);
+    FileAppender<T> appender = 
factory.newAppender(outputFile.encryptingOutputFile(), fileFormat);
+    return new WrappedFileAppender<>(partitionKey, outputFile, appender);
+  }
+
+  private void closeCurrentWriter(WrappedFileAppender<T> wrap) throws 
IOException {
+    DataFile dataFile = closeFileAppender(wrap.fileAppender, 
wrap.encryptedOutputFile, spec, wrap.partitionKey);
+    completeDataFiles.add(dataFile);
+  }
+
+  private static class WrappedFileAppender<T> {
+    private final PartitionKey partitionKey;
+    private final EncryptedOutputFile encryptedOutputFile;
+    private final FileAppender<T> fileAppender;
+
+    private long currentRows = 0;
+
+    WrappedFileAppender(PartitionKey partitionKey,
+                        EncryptedOutputFile encryptedOutputFile,
+                        FileAppender<T> fileAppender) {
+      this.partitionKey = partitionKey;
+      this.encryptedOutputFile = encryptedOutputFile;
+      this.fileAppender = fileAppender;
+    }

Review comment:
       add a method in this class: `add(...)`, you can increment `currentRows` 
here.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/writer/TaskWriter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+
+/**
+ * The writer interface which could accept records and provide the generated 
data files.
+ *
+ * @param <T> to indicate the record data type.
+ */
+public interface TaskWriter<T> {
+  int ROW_DIVISOR = 1000;
+
+  /**
+   * Append the row into the data files.
+   */
+  void append(T record) throws IOException;
+
+  /**
+   * Close the writer.
+   */
+  void close() throws IOException;
+
+  /**
+   * To get the full list of complete files, we should call this method after 
{@link TaskWriter#close()} because the
+   * close method will close all the opening data files and build {@link 
DataFile} to the return array list.
+   *
+   * @return the cached completed data files of this task writer.
+   */
+  List<DataFile> getCompleteFiles();
+
+  /**
+   * Reset to clear all the cached complete files.
+   */
+  void reset();

Review comment:
       A `reset` in `TaskWriter` is just clear complete files?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.PartitionKey;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The PartitionFanoutWriter will open a writing data file for each partition 
and route the given record to the
+ * corresponding data file in the correct partition.
+ *
+ * @param <T> defines the data type of record to write.
+ */
+class PartitionWriter<T> extends BaseTaskWriter<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionWriter.class);
+
+  private final PartitionSpec spec;
+  private final FileAppenderFactory<T> factory;
+  private final Function<PartitionKey, EncryptedOutputFile> outputFileGetter;
+  private final Function<T, PartitionKey> keyGetter;
+  private final Map<PartitionKey, WrappedFileAppender<T>> writers;
+  private final long targetFileSize;
+  private final FileFormat fileFormat;
+  private final List<DataFile> completeDataFiles;
+
+  PartitionWriter(PartitionSpec spec,
+                  FileAppenderFactory<T> factory,
+                  Function<PartitionKey, EncryptedOutputFile> outputFileGetter,
+                  Function<T, PartitionKey> keyGetter,
+                  long targetFileSize,
+                  FileFormat fileFormat) {
+    this.spec = spec;
+    this.factory = factory;
+    this.outputFileGetter = outputFileGetter;
+    this.keyGetter = keyGetter;
+    this.writers = Maps.newHashMap();
+    this.targetFileSize = targetFileSize;
+    this.fileFormat = fileFormat;
+    this.completeDataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  public void append(T record) throws IOException {
+    PartitionKey partitionKey = keyGetter.apply(record);
+    Preconditions.checkArgument(partitionKey != null, "Partition key shouldn't 
be null");
+
+    WrappedFileAppender<T> writer = writers.get(partitionKey);
+    if (writer == null) {
+      writer = createWrappedFileAppender(partitionKey);
+      writers.put(partitionKey, writer);
+    }
+    writer.fileAppender.add(record);
+
+    // Roll the writer if reach the target file size.
+    writer.currentRows++;
+    if (writer.currentRows % ROW_DIVISOR == 0 && writer.fileAppender.length() 
>= targetFileSize) {
+      closeCurrentWriter(writer);
+      writers.remove(partitionKey);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (WrappedFileAppender<T> wrap : writers.values()) {
+      closeCurrentWriter(wrap);
+      LOG.debug("Close file appender: {}, completeDataFiles: {}",
+          wrap.encryptedOutputFile.encryptingOutputFile().location(),
+          completeDataFiles.size());
+    }
+    this.writers.clear();
+  }
+
+  @Override
+  public List<DataFile> getCompleteFiles() {
+    if (completeDataFiles.size() > 0) {
+      return ImmutableList.copyOf(this.completeDataFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  @Override
+  public void reset() {
+    this.completeDataFiles.clear();
+  }
+
+  private WrappedFileAppender<T> createWrappedFileAppender(PartitionKey 
partitionKey) {
+    EncryptedOutputFile outputFile = outputFileGetter.apply(partitionKey);
+    FileAppender<T> appender = 
factory.newAppender(outputFile.encryptingOutputFile(), fileFormat);
+    return new WrappedFileAppender<>(partitionKey, outputFile, appender);
+  }
+
+  private void closeCurrentWriter(WrappedFileAppender<T> wrap) throws 
IOException {
+    DataFile dataFile = closeFileAppender(wrap.fileAppender, 
wrap.encryptedOutputFile, spec, wrap.partitionKey);
+    completeDataFiles.add(dataFile);
+  }
+
+  private static class WrappedFileAppender<T> {

Review comment:
       Extract this class for `NonPartitionWriter` too?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+
+  protected DataFile closeFileAppender(FileAppender<T> fileAppender, 
EncryptedOutputFile currentFile,
+                                       PartitionSpec spec, StructLike 
partitionKey) throws IOException {
+    // Close the file appender firstly.
+    fileAppender.close();
+
+    // metrics are only valid after the appender is closed.
+    Metrics metrics = fileAppender.metrics();
+    long fileSizeInBytes = fileAppender.length();
+    List<Long> splitOffsets = fileAppender.splitOffsets();
+
+    // Construct the DataFile and add it into the completeDataFiles.
+    return DataFiles.builder(spec)
+        .withEncryptedOutputFile(currentFile)
+        .withPath(currentFile.encryptingOutputFile().location())
+        .withFileSizeInBytes(fileSizeInBytes)
+        .withPartition(spec.fields().size() == 0 ? null : partitionKey)

Review comment:
       `spec.fields().size() == 0 ? null : partitionKey` just be `partitionKey`?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/UnpartitionedWriter.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+
+class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private final FileAppenderFactory<T> factory;
+  private final Supplier<EncryptedOutputFile> outputFileSupplier;
+  private final long targetFileSize;
+  private final FileFormat fileFormat;
+  private final List<DataFile> completeDataFiles;
+
+  private long currentRows = 0;
+  private EncryptedOutputFile currentOutputFile;
+  private FileAppender<T> currentAppender = null;
+
+  UnpartitionedWriter(FileAppenderFactory<T> factory,
+                      Supplier<EncryptedOutputFile> outputFileSupplier,
+                      long targetFileSize,
+                      FileFormat fileFormat) {
+    this.factory = factory;
+    this.outputFileSupplier = outputFileSupplier;
+    this.targetFileSize = targetFileSize;
+    this.fileFormat = fileFormat;
+    this.completeDataFiles = new ArrayList<>();
+  }
+
+  @Override
+  public void append(T record) throws IOException {
+    if (currentAppender == null) {
+      currentOutputFile = outputFileSupplier.get();
+      currentAppender = 
factory.newAppender(currentOutputFile.encryptingOutputFile(), fileFormat);
+    }
+    currentAppender.add(record);
+
+    // Roll the writer if reach the target file size.
+    currentRows++;
+    if (currentRows % ROW_DIVISOR == 0 && currentAppender.length() >= 
targetFileSize) {

Review comment:
       Can you add comments to explain why need align to `ROW_DIVISOR`?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.PartitionKey;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The PartitionFanoutWriter will open a writing data file for each partition 
and route the given record to the
+ * corresponding data file in the correct partition.
+ *
+ * @param <T> defines the data type of record to write.
+ */
+class PartitionWriter<T> extends BaseTaskWriter<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionWriter.class);
+
+  private final PartitionSpec spec;
+  private final FileAppenderFactory<T> factory;
+  private final Function<PartitionKey, EncryptedOutputFile> outputFileGetter;
+  private final Function<T, PartitionKey> keyGetter;
+  private final Map<PartitionKey, WrappedFileAppender<T>> writers;
+  private final long targetFileSize;
+  private final FileFormat fileFormat;
+  private final List<DataFile> completeDataFiles;
+
+  PartitionWriter(PartitionSpec spec,
+                  FileAppenderFactory<T> factory,
+                  Function<PartitionKey, EncryptedOutputFile> outputFileGetter,
+                  Function<T, PartitionKey> keyGetter,
+                  long targetFileSize,
+                  FileFormat fileFormat) {
+    this.spec = spec;
+    this.factory = factory;
+    this.outputFileGetter = outputFileGetter;
+    this.keyGetter = keyGetter;
+    this.writers = Maps.newHashMap();
+    this.targetFileSize = targetFileSize;
+    this.fileFormat = fileFormat;
+    this.completeDataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  public void append(T record) throws IOException {
+    PartitionKey partitionKey = keyGetter.apply(record);
+    Preconditions.checkArgument(partitionKey != null, "Partition key shouldn't 
be null");
+
+    WrappedFileAppender<T> writer = writers.get(partitionKey);
+    if (writer == null) {
+      writer = createWrappedFileAppender(partitionKey);
+      writers.put(partitionKey, writer);
+    }
+    writer.fileAppender.add(record);
+
+    // Roll the writer if reach the target file size.
+    writer.currentRows++;
+    if (writer.currentRows % ROW_DIVISOR == 0 && writer.fileAppender.length() 
>= targetFileSize) {
+      closeCurrentWriter(writer);
+      writers.remove(partitionKey);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (WrappedFileAppender<T> wrap : writers.values()) {
+      closeCurrentWriter(wrap);
+      LOG.debug("Close file appender: {}, completeDataFiles: {}",
+          wrap.encryptedOutputFile.encryptingOutputFile().location(),
+          completeDataFiles.size());
+    }
+    this.writers.clear();
+  }
+
+  @Override
+  public List<DataFile> getCompleteFiles() {
+    if (completeDataFiles.size() > 0) {
+      return ImmutableList.copyOf(this.completeDataFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  @Override
+  public void reset() {
+    this.completeDataFiles.clear();
+  }
+
+  private WrappedFileAppender<T> createWrappedFileAppender(PartitionKey 
partitionKey) {
+    EncryptedOutputFile outputFile = outputFileGetter.apply(partitionKey);
+    FileAppender<T> appender = 
factory.newAppender(outputFile.encryptingOutputFile(), fileFormat);
+    return new WrappedFileAppender<>(partitionKey, outputFile, appender);
+  }
+
+  private void closeCurrentWriter(WrappedFileAppender<T> wrap) throws 
IOException {
+    DataFile dataFile = closeFileAppender(wrap.fileAppender, 
wrap.encryptedOutputFile, spec, wrap.partitionKey);
+    completeDataFiles.add(dataFile);
+  }
+
+  private static class WrappedFileAppender<T> {
+    private final PartitionKey partitionKey;

Review comment:
       We don't need store `partitionKey`, it is already in map?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/writer/PartitionWriter.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.flink.PartitionKey;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The PartitionFanoutWriter will open a writing data file for each partition 
and route the given record to the
+ * corresponding data file in the correct partition.
+ *
+ * @param <T> defines the data type of record to write.
+ */
+class PartitionWriter<T> extends BaseTaskWriter<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionWriter.class);
+
+  private final PartitionSpec spec;
+  private final FileAppenderFactory<T> factory;
+  private final Function<PartitionKey, EncryptedOutputFile> outputFileGetter;
+  private final Function<T, PartitionKey> keyGetter;
+  private final Map<PartitionKey, WrappedFileAppender<T>> writers;
+  private final long targetFileSize;
+  private final FileFormat fileFormat;
+  private final List<DataFile> completeDataFiles;
+
+  PartitionWriter(PartitionSpec spec,
+                  FileAppenderFactory<T> factory,
+                  Function<PartitionKey, EncryptedOutputFile> outputFileGetter,
+                  Function<T, PartitionKey> keyGetter,
+                  long targetFileSize,
+                  FileFormat fileFormat) {
+    this.spec = spec;
+    this.factory = factory;
+    this.outputFileGetter = outputFileGetter;
+    this.keyGetter = keyGetter;
+    this.writers = Maps.newHashMap();
+    this.targetFileSize = targetFileSize;
+    this.fileFormat = fileFormat;
+    this.completeDataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  public void append(T record) throws IOException {
+    PartitionKey partitionKey = keyGetter.apply(record);
+    Preconditions.checkArgument(partitionKey != null, "Partition key shouldn't 
be null");
+
+    WrappedFileAppender<T> writer = writers.get(partitionKey);

Review comment:
       `computeIfAbsent`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to