openinx commented on a change in pull request #1213:
URL: https://github.com/apache/iceberg/pull/1213#discussion_r460634798



##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = 
Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, 
long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the 
implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    writer.add(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!writers.isEmpty()) {
+      Iterator<RollingFileAppender> iterator = writers.values().iterator();
+      while (iterator.hasNext()) {
+        iterator.next().close();
+        // Remove from the writers after closed.
+        iterator.remove();

Review comment:
       OK, sounds good.

##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+
+abstract class BaseTaskWriter<T> implements TaskWriter<T> {
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final FileAppenderFactory<T> appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+
+  protected BaseTaskWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
+                           OutputFileFactory fileFactory, FileIO io, long 
targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  @Override
+  public void abort() throws IOException {
+    close();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public List<DataFile> complete() throws IOException {
+    close();
+
+    if (completedFiles.size() > 0) {
+      return ImmutableList.copyOf(completedFiles);
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  class RollingFileAppender implements Closeable {

Review comment:
       Well, sounds great.

##########
File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+
+public class UnpartitionedWriter<T> extends BaseTaskWriter<T> {
+
+  private RollingFileAppender currentAppender = null;
+
+  public UnpartitionedWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
+                             OutputFileFactory fileFactory, FileIO io, long 
targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  @Override
+  public void write(T record) throws IOException {
+    if (currentAppender == null) {
+      currentAppender = new RollingFileAppender(null);

Review comment:
       I refactor this part because we don't need to initialize any real writer 
if there's no record come in.  Before this patch ,   it will open a real file 
writer even if there's no record to write, and in the end we will need to close 
this useless writer and clean its file. 

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
##########
@@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask 
task) throws Exception {
     RowDataReader dataReader = new RowDataReader(
         task, schema, schema, nameMapping, io.value(), 
encryptionManager.value(), caseSensitive);
 
-    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
-        properties, schema, SparkSchemaUtil.convert(schema));
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new 
SparkAppenderFactory(properties, schema, structType);

Review comment:
       OK, let me revert this.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.flink.data.FlinkAvroWriter;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.parquet.Parquet;
+
+class TaskWriterFactory {
+  private TaskWriterFactory() {
+  }
+
+  static TaskWriter<Row> createTaskWriter(Schema schema,
+                                          PartitionSpec spec,
+                                          FileFormat format,
+                                          FileAppenderFactory<Row> 
appenderFactory,
+                                          OutputFileFactory fileFactory,
+                                          FileIO io,
+                                          long targetFileSizeBytes) {
+    if (spec.fields().isEmpty()) {
+      return new UnpartitionedWriter<>(spec, format, appenderFactory, 
fileFactory, io, targetFileSizeBytes);
+    } else {
+      return new RowPartitionedFanoutWriter(spec, format, appenderFactory, 
fileFactory,
+          io, targetFileSizeBytes, schema);
+    }
+  }
+
+  private static class RowPartitionedFanoutWriter extends 
PartitionedFanoutWriter<Row> {
+
+    private final PartitionKey partitionKey;
+    private final RowWrapper rowWrapper;
+
+    RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<Row> appenderFactory,
+                               OutputFileFactory fileFactory, FileIO io, long 
targetFileSize, Schema schema) {
+      super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+      this.partitionKey = new PartitionKey(spec, schema);
+      this.rowWrapper = new RowWrapper(schema.asStruct());
+    }
+
+    @Override
+    protected PartitionKey partition(Row row) {
+      partitionKey.partition(rowWrapper.wrap(row));
+      return partitionKey;
+    }
+  }
+
+  static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> {
+    private final Schema schema;
+    private final Map<String, String> props;
+
+    FlinkFileAppenderFactory(Schema schema, Map<String, String> props) {
+      this.schema = schema;
+      this.props = props;
+    }
+
+    @Override
+    public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat 
format) {

Review comment:
       I see those RP(s),  I'd prefer to keep the current version so that we 
could introduce the flink unit test to address this big change.  Changing it to 
RowData should be easy in future I think.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
##########
@@ -250,33 +253,42 @@ public String toString() {
       if (spec.fields().isEmpty()) {
         return new Unpartitioned24Writer(spec, format, appenderFactory, 
fileFactory, io.value(), targetFileSize);
       } else {
-        return new Partitioned24Writer(
-            spec, format, appenderFactory, fileFactory, io.value(), 
targetFileSize, writeSchema);
+        return new Partitioned24Writer(spec, format, appenderFactory, 
fileFactory, io.value(),
+            targetFileSize, writeSchema, dsSchema);
       }
     }
   }
 
-  private static class Unpartitioned24Writer extends UnpartitionedWriter 
implements DataWriter<InternalRow> {
+  private static class Unpartitioned24Writer extends 
UnpartitionedWriter<InternalRow>
+      implements DataWriter<InternalRow> {
     Unpartitioned24Writer(PartitionSpec spec, FileFormat format, 
SparkAppenderFactory appenderFactory,
                           OutputFileFactory fileFactory, FileIO fileIo, long 
targetFileSize) {
       super(spec, format, appenderFactory, fileFactory, fileIo, 
targetFileSize);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      return new TaskCommit(complete());
+      this.close();
+
+      List<DataFile> dataFiles = complete();
+      return new TaskCommit(new TaskResult(dataFiles));

Review comment:
       OK

##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+  private final Map<PartitionKey, RollingFileAppender> writers = 
Maps.newHashMap();
+
+  public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, 
FileAppenderFactory<T> appenderFactory,
+                                 OutputFileFactory fileFactory, FileIO io, 
long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  }
+
+  /**
+   * Create a PartitionKey from the values in row.
+   * <p>
+   * Any PartitionKey returned by this method can be reused by the 
implementation.
+   *
+   * @param row a data row
+   */
+  protected abstract PartitionKey partition(T row);
+
+  @Override
+  public void write(T row) throws IOException {
+    PartitionKey partitionKey = partition(row);
+
+    RollingFileAppender writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RollingFileAppender(copiedKey);
+      writers.put(copiedKey, writer);

Review comment:
       > During scan planning, IIUC, an inclusive projection could possibly 
match a very large number of rows that might fall outside of the predicate 
range if the RollingFileAppender for this rarely observed predicate at this 
Task Manager buffers its data for a very long time before writing (say days or 
even weeks in a longer running streaming query).
   
   You mean the flink streaming reader won't see the buffered data which is 
still not committed to iceberg table ?  Actually,  that's exactly the expected 
behavior.  Say we have a data pipeline:
   
   ```
   (flink-streaming-sink-job-A) -> (iceberg table) -> 
(flink-streaming-reader-job-B). 
   ```
   
   The upstream `flink-streaming-sink-job-A`  will append the records to 
iceberg table continuously, and commit to the iceberg table if checkpoint 
happen. we need to guarantee the transaction semantic, so the downstream flink 
streaming reader could only see the committed iceberg data, the delta data 
between two contiguous snapshots is the incremental data that the flink 
streaming reader should consume. 
   
   

##########
File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java
##########
@@ -17,41 +17,44 @@
  * under the License.
  */
 
-package org.apache.iceberg.spark.source;
+package org.apache.iceberg.io;
 
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class PartitionedWriter extends BaseWriter {
+public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionedWriter.class);
 
-  private final PartitionKey key;
-  private final InternalRowWrapper wrapper;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
-  PartitionedWriter(PartitionSpec spec, FileFormat format, 
SparkAppenderFactory appenderFactory,
-                    OutputFileFactory fileFactory, FileIO io, long 
targetFileSize, Schema writeSchema) {
+  private PartitionKey currentKey = null;
+  private RollingFileAppender currentAppender = null;

Review comment:
       Nice finding. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to