This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cad1249  Add DataFile rewrite Action (#1083)
cad1249 is described below

commit cad124967ceb740178a3f65b75f19017494b430d
Author: Saisai Shao <[email protected]>
AuthorDate: Fri Jun 12 04:29:24 2020 +0800

    Add DataFile rewrite Action (#1083)
---
 .../java/org/apache/iceberg/BaseTableScan.java     |  24 +-
 .../org/apache/iceberg/util/TableScanUtil.java     |  51 +++
 .../java/org/apache/iceberg/actions/Actions.java   |   4 +
 .../org/apache/iceberg/actions/BaseAction.java     |  13 +
 .../iceberg/actions/RewriteDataFilesAction.java    | 281 +++++++++++++++++
 ...ions.java => RewriteDataFilesActionResult.java} |  34 +-
 .../iceberg/actions/RewriteManifestsAction.java    |  11 +-
 .../iceberg/spark/source/RowDataRewriter.java      | 127 ++++++++
 .../org/apache/iceberg/spark/source/Writer.java    |   4 +-
 .../actions/TestRewriteDataFilesAction.java        | 344 +++++++++++++++++++++
 10 files changed, 844 insertions(+), 49 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java 
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 27730e7..f36d7ab 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -26,7 +26,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
-import java.util.function.Function;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
 import org.apache.iceberg.expressions.Binder;
@@ -35,11 +34,10 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.TableScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -266,14 +264,9 @@ abstract class BaseTableScan implements TableScan {
           TableProperties.SPLIT_OPEN_FILE_COST, 
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
     }
 
-    Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), 
openFileCost);
-
-    CloseableIterable<FileScanTask> splitFiles = splitFiles(splitSize);
-    return CloseableIterable.transform(
-        CloseableIterable.combine(
-            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
-            splitFiles),
-        BaseCombinedScanTask::new);
+    CloseableIterable<FileScanTask> fileScanTasks = planFiles();
+    CloseableIterable<FileScanTask> splitFiles = 
TableScanUtil.splitFiles(fileScanTasks, splitSize);
+    return TableScanUtil.planTasks(splitFiles, splitSize, lookback, 
openFileCost);
   }
 
   @Override
@@ -304,15 +297,6 @@ abstract class BaseTableScan implements TableScan {
         .toString();
   }
 
-  private CloseableIterable<FileScanTask> splitFiles(long splitSize) {
-    CloseableIterable<FileScanTask> fileScanTasks = planFiles();
-    Iterable<FileScanTask> splitTasks = FluentIterable
-        .from(fileScanTasks)
-        .transformAndConcat(input -> input.split(splitSize));
-    // Capture manifests which can be closed after scan planning
-    return CloseableIterable.combine(splitTasks, fileScanTasks);
-  }
-
   /**
    * To be able to make refinements {@link #select(Collection)} and {@link 
#caseSensitive(boolean)} in any order,
    * we resolve the schema to be projected lazily here.
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java 
b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
new file mode 100644
index 0000000..9e4d1dd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.function.Function;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+
+public class TableScanUtil {
+
+  private TableScanUtil() {}
+
+  public static CloseableIterable<FileScanTask> 
splitFiles(CloseableIterable<FileScanTask> tasks, long splitSize) {
+    Iterable<FileScanTask> splitTasks = FluentIterable
+        .from(tasks)
+        .transformAndConcat(input -> input.split(splitSize));
+    // Capture manifests which can be closed after scan planning
+    return CloseableIterable.combine(splitTasks, tasks);
+  }
+
+  public static CloseableIterable<CombinedScanTask> 
planTasks(CloseableIterable<FileScanTask> splitFiles,
+                                                              long splitSize, 
int lookback, long openFileCost) {
+    Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), 
openFileCost);
+
+    return CloseableIterable.transform(
+        CloseableIterable.combine(
+            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
+            splitFiles),
+        BaseCombinedScanTask::new);
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java 
b/spark/src/main/java/org/apache/iceberg/actions/Actions.java
index f2d386d..1728052 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java
@@ -47,4 +47,8 @@ public class Actions {
   public RewriteManifestsAction rewriteManifests() {
     return new RewriteManifestsAction(spark, table);
   }
+
+  public RewriteDataFilesAction rewriteDataFiles() {
+    return new RewriteDataFilesAction(spark, table);
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java 
b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
index 67d9714..0a449e6 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.actions;
 
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.spark.util.SerializableConfiguration;
 
 abstract class BaseAction<R> implements Action<R> {
 
@@ -37,4 +40,14 @@ abstract class BaseAction<R> implements Action<R> {
       return tableName + "." + type;
     }
   }
+
+  protected FileIO fileIO(Table table) {
+    if (table.io() instanceof HadoopFileIO) {
+      // we need to use Spark's SerializableConfiguration to avoid issues with 
Kryo serialization
+      SerializableConfiguration conf = new 
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
+      return new HadoopFileIO(conf::value);
+    } else {
+      return table.io();
+    }
+  }
 }
diff --git 
a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java 
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
new file mode 100644
index 0000000..e1a7584
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.source.RowDataRewriter;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RewriteDataFilesAction
+    extends BaseSnapshotUpdateAction<RewriteDataFilesAction, 
RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteDataFilesAction.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private PartitionSpec spec = null;
+  private Expression filter;
+
+  RewriteDataFilesAction(SparkSession spark, Table table) {
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    this.caseSensitive = 
Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO(table);
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected RewriteDataFilesAction self() {
+    return this;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in 
DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesAction outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid 
spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the target rewrite data file size in bytes
+   *
+   * @param targetSize size in bytes of rewrite data file
+   * @return this for method chaining
+   */
+  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
+    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data 
file size in bytes %d",
+        targetSize);
+    this.targetSizeInBytes = targetSize;
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file 
split into a task.
+   * Increasing this usually makes tasks a bit more even by considering more 
ways to pack file regions into a single
+   * task with extra planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve 
order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next 
file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesAction splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", 
lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the 
read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, 
small value would lead to a
+   * high compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost 
%d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all 
files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+  public RewriteDataFilesAction filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  @Override
+  public RewriteDataFilesActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .filter(filter)
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = 
groupTasksByPartition(fileScanTasks.iterator());
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = 
groupedTasks.entrySet().stream()
+        .filter(kv -> kv.getValue().size() > 1)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    // Nothing to rewrite if there's only one DataFile in each partition.
+    if (filteredGroupedTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+
+    // Split and combine tasks under each partition
+    List<CombinedScanTask> combinedScanTasks = 
filteredGroupedTasks.values().stream()
+        .map(scanTasks -> {
+          CloseableIterable<FileScanTask> splitTasks = 
TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
+          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, 
splitLookback, splitOpenFileCost);
+        })
+        .flatMap(Streams::stream)
+        .collect(Collectors.toList());
+
+    JavaRDD<CombinedScanTask> taskRDD = 
sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
+
+    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
+    Broadcast<EncryptionManager> encryption = 
sparkContext.broadcast(encryptionManager);
+
+    RowDataRewriter rowDataRewriter =
+        new RowDataRewriter(table, spec, caseSensitive, io, encryption, 
targetSizeInBytes);
+
+    List<DataFile> addedDataFiles = 
rowDataRewriter.rewriteDataForTasks(taskRDD);
+    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
+        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+        .collect(Collectors.toList());
+    replaceDataFiles(currentDataFiles, addedDataFiles);
+
+    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
+  }
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> 
groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = 
Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+
+    try {
+      tasksIter.forEachRemaining(task -> {
+        StructLikeWrapper structLike = 
StructLikeWrapper.wrap(task.file().partition());
+        tasksGroupedByPartition.put(structLike, task);
+      });
+
+    } finally {
+      try {
+        tasksIter.close();
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterator", ioe);
+      }
+    }
+
+    return tasksGroupedByPartition.asMap();
+  }
+
+  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, 
Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), 
Sets.newHashSet(addedDataFiles));
+      commit(rewriteFiles);
+
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> 
f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
+          .run(fileIO::deleteFile);
+
+      throw e;
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java 
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
similarity index 51%
copy from spark/src/main/java/org/apache/iceberg/actions/Actions.java
copy to 
spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
index f2d386d..7313d9c 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
+++ 
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
@@ -19,32 +19,32 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.Table;
-import org.apache.spark.sql.SparkSession;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
-public class Actions {
+public class RewriteDataFilesActionResult {
 
-  private SparkSession spark;
-  private Table table;
+  private static final RewriteDataFilesActionResult EMPTY =
+      new RewriteDataFilesActionResult(ImmutableList.of(), ImmutableList.of());
 
-  private Actions(SparkSession spark, Table table) {
-    this.spark = spark;
-    this.table = table;
-  }
+  private List<DataFile> deletedDataFiles;
+  private List<DataFile> addedDataFiles;
 
-  public static Actions forTable(SparkSession spark, Table table) {
-    return new Actions(spark, table);
+  public RewriteDataFilesActionResult(List<DataFile> deletedDataFiles, 
List<DataFile> addedDataFiles) {
+    this.deletedDataFiles = deletedDataFiles;
+    this.addedDataFiles = addedDataFiles;
   }
 
-  public static Actions forTable(Table table) {
-    return new Actions(SparkSession.active(), table);
+  static RewriteDataFilesActionResult empty() {
+    return EMPTY;
   }
 
-  public RemoveOrphanFilesAction removeOrphanFiles() {
-    return new RemoveOrphanFilesAction(spark, table);
+  public List<DataFile> deletedDataFiles() {
+    return deletedDataFiles;
   }
 
-  public RewriteManifestsAction rewriteManifests() {
-    return new RewriteManifestsAction(spark, table);
+  public List<DataFile> addedDataFiles() {
+    return addedDataFiles;
   }
 }
diff --git 
a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java 
b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
index 787852a..7a1c802 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -64,7 +63,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,14 +102,7 @@ public class RewriteManifestsAction
         table.properties(),
         TableProperties.MANIFEST_TARGET_SIZE_BYTES,
         TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-
-    if (table.io() instanceof HadoopFileIO) {
-      // we need to use Spark's SerializableConfiguration to avoid issues with 
Kryo serialization
-      SerializableConfiguration conf = new 
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
-      this.fileIO = new HadoopFileIO(conf::value);
-    } else {
-      this.fileIO = table.io();
-    }
+    this.fileIO = fileIO(table);
 
     // default the staging location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
new file mode 100644
index 0000000..0a56874
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RowDataRewriter implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataRewriter.class);
+
+  private final Broadcast<FileIO> fileIO;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final String tableSchema;
+  private final Writer.WriterFactory writerFactory;
+  private final boolean caseSensitive;
+
+  public RowDataRewriter(Table table, PartitionSpec spec, boolean 
caseSensitive,
+                         Broadcast<FileIO> fileIO, 
Broadcast<EncryptionManager> encryptionManager,
+                         long targetDataFileSizeInBytes) {
+    this.fileIO = fileIO;
+    this.encryptionManager = encryptionManager;
+
+    this.caseSensitive = caseSensitive;
+    this.tableSchema = SchemaParser.toJson(table.schema());
+
+    String formatString = table.properties().getOrDefault(
+        TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    FileFormat fileFormat = 
FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    this.writerFactory = new Writer.WriterFactory(spec, fileFormat, 
table.locationProvider(), table.properties(),
+        fileIO, encryptionManager, targetDataFileSizeInBytes, table.schema(), 
SparkSchemaUtil.convert(table.schema()));
+  }
+
+  public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) 
{
+    JavaRDD<Writer.TaskCommit> taskCommitRDD = 
taskRDD.map(this::rewriteDataForTask);
+
+    return taskCommitRDD.collect().stream()
+        .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
+        .collect(Collectors.toList());
+  }
+
+  private Writer.TaskCommit rewriteDataForTask(CombinedScanTask task) throws 
Exception {
+    TaskContext context = TaskContext.get();
+
+    RowDataReader dataReader = new RowDataReader(task, 
SchemaParser.fromJson(tableSchema),
+        SchemaParser.fromJson(tableSchema), fileIO.value(), 
encryptionManager.value(), caseSensitive);
+
+    int partitionId = context.partitionId();
+    long taskId = context.taskAttemptId();
+    DataWriter<InternalRow> dataWriter = 
writerFactory.createDataWriter(partitionId, taskId, 0);
+
+    try {
+      while (dataReader.next()) {
+        InternalRow row = dataReader.get();
+        dataWriter.write(row);
+      }
+
+      dataReader.close();
+      dataReader = null;
+      return (Writer.TaskCommit) dataWriter.commit();
+
+    } catch (Throwable originalThrowable) {
+      try {
+        LOG.error("Aborting task", originalThrowable);
+        context.markTaskFailed(originalThrowable);
+
+        LOG.error("Aborting commit for partition {} (task {}, attempt {}, 
stage {}.{})",
+            partitionId, taskId, context.attemptNumber(), context.stageId(), 
context.stageAttemptNumber());
+        if (dataReader != null) {
+          dataReader.close();
+        }
+        dataWriter.abort();
+        LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage 
{}.{})",
+            partitionId, taskId, context.taskAttemptId(), context.stageId(), 
context.stageAttemptNumber());
+
+      } catch (Throwable inner) {
+        if (originalThrowable != inner) {
+          originalThrowable.addSuppressed(inner);
+          LOG.warn("Suppressing exception in catch: {}", inner.getMessage(), 
inner);
+        }
+      }
+
+      if (originalThrowable instanceof Exception) {
+        throw originalThrowable;
+      } else {
+        throw new RuntimeException(originalThrowable);
+      }
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 298bcb9..ffe16cf 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -236,7 +236,7 @@ class Writer implements DataSourceWriter {
   }
 
 
-  private static class TaskCommit implements WriterCommitMessage {
+  static class TaskCommit implements WriterCommitMessage {
     private final DataFile[] files;
 
     TaskCommit() {
@@ -256,7 +256,7 @@ class Writer implements DataSourceWriter {
     }
   }
 
-  private static class WriterFactory implements DataWriterFactory<InternalRow> 
{
+  static class WriterFactory implements DataWriterFactory<InternalRow> {
     private final PartitionSpec spec;
     private final FileFormat format;
     private final LocationProvider locations;
diff --git 
a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
 
b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
new file mode 100644
index 0000000..75a88fa
--- /dev/null
+++ 
b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestRewriteDataFilesAction {
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "c1", Types.IntegerType.get()),
+      optional(2, "c2", Types.StringType.get()),
+      optional(3, "c3", Types.StringType.get())
+  );
+
+  private static SparkSession spark;
+
+  @BeforeClass
+  public static void startSpark() {
+    TestRewriteDataFilesAction.spark = SparkSession.builder()
+        .master("local[2]")
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestRewriteDataFilesAction.spark;
+    TestRewriteDataFilesAction.spark = null;
+    currentSpark.stop();
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private String tableLocation = null;
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    File tableDir = temp.newFolder();
+    this.tableLocation = tableDir.toURI().toString();
+  }
+
+  @Test
+  public void testRewriteDataFilesEmptyTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    Actions actions = Actions.forTable(table);
+
+    actions.rewriteDataFiles().execute();
+
+    Assert.assertNull("Table must stay empty", table.currentSnapshot());
+  }
+
+  @Test
+  public void testRewriteDataFilesUnpartitionedTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, null, "AAAA"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+        new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, 
dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+    Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 1 data files before rewrite", 1, 
dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesPartitionedTable() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .truncate("c2", 2)
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, 
dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 4 data files before rewrite", 4, 
dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteDataFilesWithFilter() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+        .identity("c1")
+        .truncate("c2", 2)
+        .build();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+    );
+    writeRecords(records1);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+    );
+    writeRecords(records2);
+
+    List<ThreeColumnRecord> records3 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+        new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+    );
+    writeRecords(records3);
+
+    List<ThreeColumnRecord> records4 = Lists.newArrayList(
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+        new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+    );
+    writeRecords(records4);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 8 data files before rewrite", 8, 
dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .filter(Expressions.equal("c1", 1))
+        .filter(Expressions.startsWith("c2", "AA"))
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+    List<DataFile> dataFiles1 = 
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+    Assert.assertEquals("Should have 7 data files before rewrite", 7, 
dataFiles1.size());
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+    expectedRecords.addAll(records3);
+    expectedRecords.addAll(records4);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
+  @Test
+  public void testRewriteLargeTableHasResiduals() {
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100");
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    // all records belong to the same partition
+    List<ThreeColumnRecord> records = Lists.newArrayList();
+    for (int i = 0; i < 100; i++) {
+      records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i 
% 4)));
+    }
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan()
+        .ignoreResiduals()
+        .filter(Expressions.equal("c3", "0"))
+        .planFiles();
+    for (FileScanTask task : tasks) {
+      Assert.assertEquals("Residuals must be ignored", 
Expressions.alwaysTrue(), task.residual());
+    }
+    List<DataFile> dataFiles = 
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    Assert.assertEquals("Should have 2 data files before rewrite", 2, 
dataFiles.size());
+
+    Actions actions = Actions.forTable(table);
+
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .filter(Expressions.equal("c3", "0"))
+        .execute();
+    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFiles().size());
+
+    table.refresh();
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords = resultDF.sort("c1")
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Rows must match", records, actualRecords);
+  }
+
+  private void writeRecords(List<ThreeColumnRecord> records) {
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+  }
+
+  private void writeDF(Dataset<Row> df) {
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+  }
+}

Reply via email to