This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f5bcf  Flink:  Add a rewrite datafile action for flink (#1623)
f9f5bcf is described below

commit f9f5bcfe66be2c808a4f14f59c195f92c31de8fa
Author: JunZhang <[email protected]>
AuthorDate: Wed Nov 4 19:34:10 2020 +0800

    Flink:  Add a rewrite datafile action for flink (#1623)
---
 .../java/org/apache/iceberg/util/PropertyUtil.java |   9 +
 .../Actions.java}                                  |  49 ++--
 .../flink/actions/RewriteDataFilesAction.java      |  68 +++++
 .../iceberg/flink/sink/TaskWriterFactory.java      |   2 +-
 .../iceberg/flink/source/RowDataRewriter.java      | 155 +++++++++++
 .../org/apache/iceberg/flink/FlinkTestBase.java    |   2 +-
 .../flink/actions/TestRewriteDataFilesAction.java  | 283 +++++++++++++++++++++
 .../iceberg/actions/RewriteDataFilesAction.java    |   6 +-
 8 files changed, 547 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java 
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index 5a5fd6f..2df88c0 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -52,4 +52,13 @@ public class PropertyUtil {
     }
     return defaultValue;
   }
+
+  public static String propertyAsString(Map<String, String> properties,
+                                        String property, String defaultValue) {
+    String value = properties.get(property);
+    if (value != null) {
+      return properties.get(property);
+    }
+    return defaultValue;
+  }
 }
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java 
b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
similarity index 52%
copy from 
flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
copy to flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
index 6ed7696..3fc2bb7 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
@@ -17,30 +17,31 @@
  * under the License.
  */
 
-package org.apache.iceberg.flink.sink;
+package org.apache.iceberg.flink.actions;
 
-import java.io.Serializable;
-import org.apache.iceberg.io.TaskWriter;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.Table;
+
+public class Actions {
+
+  private StreamExecutionEnvironment env;
+  private Table table;
+
+  private Actions(StreamExecutionEnvironment env, Table table) {
+    this.env = env;
+    this.table = table;
+  }
+
+  public static Actions forTable(StreamExecutionEnvironment env, Table table) {
+    return new Actions(env, table);
+  }
+
+  public static Actions forTable(Table table) {
+    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), 
table);
+  }
+
+  public RewriteDataFilesAction rewriteDataFiles() {
+    return new RewriteDataFilesAction(env, table);
+  }
 
-/**
- * Factory to create {@link TaskWriter}
- *
- * @param <T> data type of record.
- */
-interface TaskWriterFactory<T> extends Serializable {
-
-  /**
-   * Initialize the factory with a given taskId and attemptId.
-   *
-   * @param taskId    the identifier of task.
-   * @param attemptId the attempt id of this task.
-   */
-  void initialize(int taskId, int attemptId);
-
-  /**
-   * Initialize a {@link TaskWriter} with given task id and attempt id.
-   *
-   * @return a newly created task writer.
-   */
-  TaskWriter<T> create();
 }
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
 
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
new file mode 100644
index 0000000..12a8000
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private StreamExecutionEnvironment env;
+  private int maxParallelism;
+
+  public RewriteDataFilesAction(StreamExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+    this.maxParallelism = env.getParallelism();
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    int size = combinedScanTasks.size();
+    int parallelism = Math.min(size, maxParallelism);
+    DataStream<CombinedScanTask> dataStream = 
env.fromCollection(combinedScanTasks);
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), 
caseSensitive(), fileIO(), encryptionManager());
+    List<DataFile> addedDataFiles = 
rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+    return addedDataFiles;
+  }
+
+  @Override
+  protected RewriteDataFilesAction self() {
+    return this;
+  }
+
+  public void maxParallelism(int parallelism) {
+    Preconditions.checkArgument(parallelism > 0, "Invalid max parallelism %d", 
parallelism);
+    this.maxParallelism = parallelism;
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java 
b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
index 6ed7696..9d56ec6 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
@@ -27,7 +27,7 @@ import org.apache.iceberg.io.TaskWriter;
  *
  * @param <T> data type of record.
  */
-interface TaskWriterFactory<T> extends Serializable {
+public interface TaskWriterFactory<T> extends Serializable {
 
   /**
    * Initialize the factory with a given taskId and attemptId.
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java 
b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
new file mode 100644
index 0000000..65a3ca9
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Schema schema;
+  private final FileFormat format;
+  private final String nameMapping;
+  private final FileIO io;
+  private final boolean caseSensitive;
+  private final EncryptionManager encryptionManager;
+  private final TaskWriterFactory<RowData> taskWriterFactory;
+
+  public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
+    this.schema = table.schema();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+
+    String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
+        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+    this.taskWriterFactory = new RowDataTaskWriterFactory(
+        table.schema(),
+        flinkSchema,
+        table.spec(),
+        table.locationProvider(),
+        io,
+        encryptionManager,
+        Long.MAX_VALUE,
+        format,
+        table.properties());
+  }
+
+  public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> 
dataStream, int parallelism) {
+    RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, 
encryptionManager, taskWriterFactory);
+    DataStream<List<DataFile>> ds = 
dataStream.map(map).setParallelism(parallelism);
+    return 
Lists.newArrayList(DataStreamUtils.collect(ds)).stream().flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  public static class RewriteMap extends RichMapFunction<CombinedScanTask, 
List<DataFile>> {
+
+    private TaskWriter<RowData> writer;
+    private int subTaskId;
+    private int attemptId;
+
+    private final Schema schema;
+    private final String nameMapping;
+    private final FileIO io;
+    private final boolean caseSensitive;
+    private final EncryptionManager encryptionManager;
+    private final TaskWriterFactory<RowData> taskWriterFactory;
+
+    public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean 
caseSensitive,
+                      EncryptionManager encryptionManager, 
TaskWriterFactory<RowData> taskWriterFactory) {
+      this.schema = schema;
+      this.nameMapping = nameMapping;
+      this.io = io;
+      this.caseSensitive = caseSensitive;
+      this.encryptionManager = encryptionManager;
+      this.taskWriterFactory = taskWriterFactory;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+      this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+      this.attemptId = getRuntimeContext().getAttemptNumber();
+      // Initialize the task writer factory.
+      this.taskWriterFactory.initialize(subTaskId, attemptId);
+    }
+
+    @Override
+    public List<DataFile> map(CombinedScanTask task) throws Exception {
+      // Initialize the task writer.
+      this.writer = taskWriterFactory.create();
+      try (RowDataIterator iterator =
+               new RowDataIterator(task, io, encryptionManager, schema, 
schema, nameMapping, caseSensitive)) {
+        while (iterator.hasNext()) {
+          RowData rowData = iterator.next();
+          writer.write(rowData);
+        }
+        return Lists.newArrayList(writer.complete());
+      } catch (Throwable originalThrowable) {
+        try {
+          LOG.error("Aborting commit for  (subTaskId {}, attemptId {})", 
subTaskId, attemptId);
+          writer.abort();
+          LOG.error("Aborted commit for  (subTaskId {}, attemptId {})", 
subTaskId, attemptId);
+        } catch (Throwable inner) {
+          if (originalThrowable != inner) {
+            originalThrowable.addSuppressed(inner);
+            LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), 
inner);
+          }
+        }
+
+        if (originalThrowable instanceof Exception) {
+          throw originalThrowable;
+        } else {
+          throw new RuntimeException(originalThrowable);
+        }
+      }
+    }
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java 
b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index b680af7..6782267 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -72,7 +72,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     return tEnv;
   }
 
-  List<Object[]> sql(String query, Object... args) {
+  protected List<Object[]> sql(String query, Object... args) {
     TableResult tableResult = getTableEnv().executeSql(String.format(query, 
args));
     tableResult.getJobClient().ifPresent(c -> {
       try {
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
 
b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
new file mode 100644
index 0000000..748a4ce
--- /dev/null
+++ 
b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.flink.actions;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+
+  private static final String TABLE_NAME_UNPARTITIONED = 
"test_table_unpartitioned";
+  private static final String TABLE_NAME_PARTITIONED = 
"test_table_partitioned";
+  private final FileFormat format;
+  private Table icebergTableUnPartitioned;
+  private Table icebergTablePartitioned;
+
+  public TestRewriteDataFilesAction(String catalogName, String[] 
baseNamespace, FileFormat format) {
+    super(catalogName, baseNamespace);
+    this.format = format;
+  }
+
+  @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, 
format={2}")
+  public static Iterable<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format : new FileFormat[] {FileFormat.AVRO, 
FileFormat.ORC, FileFormat.PARQUET}) {
+      for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+        String catalogName = (String) catalogParams[0];
+        String[] baseNamespace = (String[]) catalogParams[1];
+        parameters.add(new Object[] {catalogName, baseNamespace, format});
+      }
+    }
+    return parameters;
+  }
+
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+    sql("CREATE TABLE %s (id int, data varchar) with 
('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED,
+        format.name());
+    icebergTableUnPartitioned = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+        TABLE_NAME_UNPARTITIONED));
+
+    sql("CREATE TABLE %s (id int, data varchar,spec varchar) " +
+            " PARTITIONED BY (data,spec) with ('write.format.default'='%s')",
+        TABLE_NAME_PARTITIONED, format.name());
+    icebergTablePartitioned = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+        TABLE_NAME_PARTITIONED));
+  }
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  @Test
+  public void testRewriteDataFilesEmptyTable() throws Exception {
+    Assert.assertNull("Table must be empty", 
icebergTableUnPartitioned.currentSnapshot());
+    Actions.forTable(icebergTableUnPartitioned)
+        .rewriteDataFiles()
+        .execute();
+    Assert.assertNull("Table must stay empty", 
icebergTableUnPartitioned.currentSnapshot());
+  }
+
+
+  @Test
+  public void testRewriteDataFilesUnpartitionedTable() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTableUnPartitioned)
+            .rewriteDataFiles()
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTableUnPartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 1 data files after rewrite", 1, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, 
Lists.newArrayList(
+        SimpleDataUtil.createRecord(1, "hello"),
+        SimpleDataUtil.createRecord(2, "world")
+    ));
+  }
+
+  @Test
+  public void testRewriteDataFilesPartitionedTable() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 3, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTablePartitioned)
+            .rewriteDataFiles()
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data file", 2, 
result.addedDataFiles().size());
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files after rewrite", 2, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    Schema schema = new Schema(
+        Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "spec", Types.StringType.get())
+    );
+
+    Record record = GenericRecord.create(schema);
+    SimpleDataUtil.assertTableRecords(icebergTablePartitioned, 
Lists.newArrayList(
+        record.copy("id", 1, "data", "hello", "spec", "a"),
+        record.copy("id", 2, "data", "hello", "spec", "a"),
+        record.copy("id", 3, "data", "world", "spec", "b"),
+        record.copy("id", 4, "data", "world", "spec", "b")
+    ));
+  }
+
+
+  @Test
+  public void testRewriteDataFilesWithFilter() throws Exception {
+    sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 3, 'world' ,'a'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+    sql("INSERT INTO %s SELECT 5, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 5 data files before rewrite", 5, 
dataFiles.size());
+
+    RewriteDataFilesActionResult result =
+        Actions.forTable(icebergTablePartitioned)
+            .rewriteDataFiles()
+            .filter(Expressions.equal("spec", "a"))
+            .filter(Expressions.startsWith("data", "he"))
+            .execute();
+
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    icebergTablePartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = 
icebergTablePartitioned.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files after rewrite", 4, 
dataFiles1.size());
+
+    // Assert the table records as expected.
+    Schema schema = new Schema(
+        Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "spec", Types.StringType.get())
+    );
+
+    Record record = GenericRecord.create(schema);
+    SimpleDataUtil.assertTableRecords(icebergTablePartitioned, 
Lists.newArrayList(
+        record.copy("id", 1, "data", "hello", "spec", "a"),
+        record.copy("id", 2, "data", "hello", "spec", "a"),
+        record.copy("id", 3, "data", "world", "spec", "a"),
+        record.copy("id", 4, "data", "world", "spec", "b"),
+        record.copy("id", 5, "data", "world", "spec", "b")
+    ));
+  }
+
+  @Test
+  public void testRewriteLargeTableHasResiduals() throws IOException {
+    // all records belong to the same partition
+    List<String> records1 = Lists.newArrayList();
+    List<String> records2 = Lists.newArrayList();
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 100; i++) {
+      int id = i;
+      String data = String.valueOf(i % 3);
+      if (i % 2 == 0) {
+        records1.add("(" + id + ",'" + data + "')");
+      } else {
+        records2.add("(" + id + ",'" + data + "')");
+      }
+      Record record = RECORD.copy();
+      record.setField("id", id);
+      record.setField("data", data);
+      expected.add(record);
+    }
+
+    sql("INSERT INTO %s values " + StringUtils.join(records1, ","), 
TABLE_NAME_UNPARTITIONED);
+    sql("INSERT INTO %s values " + StringUtils.join(records2, ","), 
TABLE_NAME_UNPARTITIONED);
+
+    icebergTableUnPartitioned.refresh();
+
+    CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan()
+        .ignoreResiduals()
+        .filter(Expressions.equal("data", "0"))
+        .planFiles();
+    for (FileScanTask task : tasks) {
+      Assert.assertEquals("Residuals must be ignored", 
Expressions.alwaysTrue(), task.residual());
+    }
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
+
+    Actions actions = Actions.forTable(icebergTableUnPartitioned);
+
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .filter(Expressions.equal("data", "0"))
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    // Assert the table records as expected.
+    SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java 
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index b22da52..1f40738 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -36,6 +36,7 @@ public class RewriteDataFilesAction
     extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
 
   private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
 
   RewriteDataFilesAction(SparkSession spark, Table table) {
     super(table);
@@ -49,7 +50,10 @@ public class RewriteDataFilesAction
 
   @Override
   protected FileIO fileIO() {
-    return SparkUtil.serializableFileIO(table());
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
   }
 
   @Override

Reply via email to