openinx commented on a change in pull request #1623:
URL: https://github.com/apache/iceberg/pull/1623#discussion_r514774682



##########
File path: 
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -268,7 +272,7 @@ private void replaceDataFiles(Iterable<DataFile> 
deletedDataFiles, Iterable<Data
     }
   }
 
-  protected abstract FileIO fileIO();
+  protected abstract FileIO setFileIO();

Review comment:
       why `setFileIO` ?  That's quite confusing,  it's more likely we are 
getting fileIO while we name it as `setFileIO` ?  I don't think we need to 
change this. 

##########
File path: flink/src/main/java/org/apache/iceberg/actions/Actions.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;

Review comment:
       I'm thinking that we may need to move this class into 
`org.apache.iceberg.flink.actions` as we flink & spark both have the same class 
name `Actions` under the package `org.apache.iceberg.actions`,  then how to 
identify the class if someone want to execute the actions in the same project 
for both flink and spark ? 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.caseSensitive = caseSensitive;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,

Review comment:
       nit: could we separate this `format` parser into a separate code block 
by left a new empty line ? That makes the code more clear.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.caseSensitive = caseSensitive;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
+        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+    this.taskWriterFactory = new RowDataTaskWriterFactory(
+        table.schema(),
+        flinkSchema,
+        table.spec(),
+        table.locationProvider(),
+        io,
+        encryptionManager,
+        Long.MAX_VALUE,
+        format,
+        table.properties());
+  }
+
+  public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet) 
throws Exception {
+    RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, 
encryptionManager, taskWriterFactory);
+    DataSet<List<DataFile>> ds = dataSet.map(map);
+    return 
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
+  public static class RewriteMap extends RichMapFunction<CombinedScanTask, 
List<DataFile>> {
+
+    private transient TaskWriter<RowData> writer;
+    private transient int subTaskId;
+    private transient int attemptId;
+
+    private final Schema schema;
+    private final String nameMapping;
+    private final FileIO io;
+    private final boolean caseSensitive;
+    private final EncryptionManager encryptionManager;
+    private final TaskWriterFactory<RowData> taskWriterFactory;
+
+    public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean 
caseSensitive,
+                      EncryptionManager encryptionManager, 
TaskWriterFactory<RowData> taskWriterFactory) {
+      this.schema = schema;
+      this.nameMapping = nameMapping;
+      this.io = io;
+      this.caseSensitive = caseSensitive;
+      this.encryptionManager = encryptionManager;
+      this.taskWriterFactory = taskWriterFactory;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+      this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+      this.attemptId = getRuntimeContext().getAttemptNumber();
+      // Initialize the task writer factory.
+      this.taskWriterFactory.initialize(subTaskId, attemptId);
+    }
+
+    @Override
+    public List<DataFile> map(CombinedScanTask task) throws Exception {
+      // Initialize the task writer.
+      this.writer = taskWriterFactory.create();
+      RowDataIterator iterator =
+          new RowDataIterator(task, io, encryptionManager, schema, schema, 
nameMapping, caseSensitive);
+      try {
+        while (iterator.hasNext()) {
+          RowData rowData = iterator.next();
+          writer.write(rowData);
+        }
+        iterator.close();
+        return Lists.newArrayList(writer.complete());
+      } catch (Throwable originalThrowable) {
+        try {

Review comment:
       I think we've forgot to close the `RowDataIterator` if encountered any 
exception when iterate the records.  Pls handle this carefully because if close 
failed then we won't want to close it again in the `catch` block.

##########
File path: 
flink/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {

Review comment:
       I think we should also provide an unit test similar to the spark 
`testRewriteLargeTableHasResiduals`, that test the option `ignoreResiduals`.  

##########
File path: 
flink/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends

Review comment:
       nit: don't have to wrap the line here ..

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.caseSensitive = caseSensitive;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
+        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));

Review comment:
       nit: could we keep the same order between the constructor's argument 
assignment and fields definition ?  for example:
   
   ```java
   private final int a;
   private final int b;
   
   public Construct(int a, int b){
       this.a = a;
       this.b = b;
   }
   
   ```
   

##########
File path: core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
##########
@@ -52,4 +52,13 @@ public static long propertyAsLong(Map<String, String> 
properties,
     }
     return defaultValue;
   }
+
+  public static String propertyAsString(Map<String, String> properties,
+                                    String property, String defaultValue) {

Review comment:
       nit:  code format ? 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {

Review comment:
       we don't need to expose this class to public ? As the apache iceberg is 
a lib,  so we're tried to avoid to expose unnecessary internal classes or 
interfaces to public, so that users won't abuse them. 

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.caseSensitive = caseSensitive;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
+        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+    this.taskWriterFactory = new RowDataTaskWriterFactory(
+        table.schema(),
+        flinkSchema,
+        table.spec(),
+        table.locationProvider(),
+        io,
+        encryptionManager,
+        Long.MAX_VALUE,
+        format,
+        table.properties());
+  }
+
+  public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet) 
throws Exception {
+    RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, 
encryptionManager, taskWriterFactory);
+    DataSet<List<DataFile>> ds = dataSet.map(map);
+    return 
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
+  public static class RewriteMap extends RichMapFunction<CombinedScanTask, 
List<DataFile>> {
+
+    private transient TaskWriter<RowData> writer;

Review comment:
       nit:  could just use local variable ?  don't have to be a transient 
field.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.caseSensitive = caseSensitive;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
+        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+    this.taskWriterFactory = new RowDataTaskWriterFactory(
+        table.schema(),
+        flinkSchema,
+        table.spec(),
+        table.locationProvider(),
+        io,
+        encryptionManager,
+        Long.MAX_VALUE,
+        format,
+        table.properties());
+  }
+
+  public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet) 
throws Exception {
+    RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, 
encryptionManager, taskWriterFactory);
+    DataSet<List<DataFile>> ds = dataSet.map(map);
+    return 
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
+  public static class RewriteMap extends RichMapFunction<CombinedScanTask, 
List<DataFile>> {
+
+    private transient TaskWriter<RowData> writer;
+    private transient int subTaskId;
+    private transient int attemptId;
+
+    private final Schema schema;
+    private final String nameMapping;
+    private final FileIO io;
+    private final boolean caseSensitive;
+    private final EncryptionManager encryptionManager;
+    private final TaskWriterFactory<RowData> taskWriterFactory;
+
+    public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean 
caseSensitive,
+                      EncryptionManager encryptionManager, 
TaskWriterFactory<RowData> taskWriterFactory) {
+      this.schema = schema;
+      this.nameMapping = nameMapping;
+      this.io = io;
+      this.caseSensitive = caseSensitive;
+      this.encryptionManager = encryptionManager;
+      this.taskWriterFactory = taskWriterFactory;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+      this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+      this.attemptId = getRuntimeContext().getAttemptNumber();
+      // Initialize the task writer factory.
+      this.taskWriterFactory.initialize(subTaskId, attemptId);
+    }
+
+    @Override
+    public List<DataFile> map(CombinedScanTask task) throws Exception {
+      // Initialize the task writer.
+      this.writer = taskWriterFactory.create();
+      RowDataIterator iterator =
+          new RowDataIterator(task, io, encryptionManager, schema, schema, 
nameMapping, caseSensitive);
+      try {
+        while (iterator.hasNext()) {
+          RowData rowData = iterator.next();
+          writer.write(rowData);
+        }
+        iterator.close();
+        return Lists.newArrayList(writer.complete());
+      } catch (Throwable originalThrowable) {
+        try {
+          LOG.error("Aborting commit for  (subTaskId {}, attemptId {})", 
subTaskId, attemptId);
+          writer.abort();
+          LOG.error("Aborted commit for  (subTaskId {}, attemptId {})", 
subTaskId, attemptId);
+        } catch (Throwable inner) {
+          if (originalThrowable != inner) {
+            originalThrowable.addSuppressed(inner);

Review comment:
       What's the reason that we need to supress the `inner` exception when 
it's different with the `originalThrowable` ? 

##########
File path: 
flink/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+
+  private static final String TABLE_NAME_UNPARTIITONED = 
"test_table_unpartitioned";
+  private static final String TABLE_NAME_PARTIITONED = 
"test_table_partitioned";
+  private final FileFormat format;
+  private Table icebergTableUnPartitioned;
+  private Table icebergTablePartitioned;
+
+  public TestRewriteDataFilesAction(String catalogName, String[] 
baseNamespace, FileFormat format) {
+    super(catalogName, baseNamespace);
+    this.format = format;
+  }
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
format={2}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.ORC, 
FileFormat.AVRO, FileFormat.PARQUET}) {
+      for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+        String catalogName = (String) catalogParams[0];
+        String[] baseNamespace = (String[]) catalogParams[1];
+        parameters.add(new Object[] {catalogName, baseNamespace, format});
+      }
+    }
+    return parameters;
+  }
+
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+    sql("CREATE TABLE %s (id int, data varchar) with 
('write.format.default'='%s')", TABLE_NAME_UNPARTIITONED,
+        format.name());
+    icebergTableUnPartitioned = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+        TABLE_NAME_UNPARTIITONED));
+
+    sql("CREATE TABLE %s (id int, data varchar)  PARTITIONED BY (data) with 
('write.format.default'='%s')",
+        TABLE_NAME_PARTIITONED, format.name());
+    icebergTablePartitioned = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+        TABLE_NAME_PARTIITONED));
+  }
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTIITONED);
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTIITONED);
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testRewriteDataFilesEmptyTable() throws Exception {
+    Assert.assertNull("Table must be empty", 
icebergTableUnPartitioned.currentSnapshot());
+    Actions.forTable(icebergTableUnPartitioned)
+        .rewriteDataFiles()
+        .execute();
+    Assert.assertNull("Table must stay empty", 
icebergTableUnPartitioned.currentSnapshot());
+  }
+
+
+  @Test
+  public void testRewriteDataFilesUnpartitionedTable() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTIITONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTIITONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTableUnPartitioned)
+            .rewriteDataFiles()
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 1 data files after rewrite", 1, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, 
Lists.newArrayList(
+        SimpleDataUtil.createRecord(1, "hello"),
+        SimpleDataUtil.createRecord(2, "world")
+    ));
+  }
+
+  @Test
+  public void testRewriteDataFilesPartitionedTable() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 2, 'hello' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 3, 'world' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 4, 'world' ", TABLE_NAME_PARTIITONED);
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTablePartitioned)
+            .rewriteDataFiles()
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, 
result.addedDataFiles().size());
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(icebergTablePartitioned, 
Lists.newArrayList(
+        SimpleDataUtil.createRecord(1, "hello"),
+        SimpleDataUtil.createRecord(2, "hello"),
+        SimpleDataUtil.createRecord(3, "world"),
+        SimpleDataUtil.createRecord(4, "world")
+    ));
+  }
+
+
+  @Test
+  public void testRewriteDataFilesWithFilter() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 1, 'world' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 2, 'world' ", TABLE_NAME_PARTIITONED);
+    sql("INSERT INTO %s SELECT 3, 'world' ", TABLE_NAME_PARTIITONED);
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 5 data files before rewrite", 5, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTablePartitioned)
+            .rewriteDataFiles()
+            .filter(Expressions.equal("id", 1))
+            .filter(Expressions.startsWith("data", "he"))
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files after rewrite", 4, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(icebergTablePartitioned, 
Lists.newArrayList(

Review comment:
       I think we may need to refactor the `SimpleDataUtil.assertTableRecords`, 
because it will use `Set` to check the expected records, that means it will 
de-duplicate the same records automatically, and won't identify the case that 
have two `<1, 'hello'>` records. 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
##########
@@ -72,7 +72,7 @@ protected TableEnvironment getTableEnv() {
     return tEnv;
   }
 
-  List<Object[]> sql(String query, Object... args) {
+  public List<Object[]> sql(String query, Object... args) {

Review comment:
       How about use `protected` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to