This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cad1249 Add DataFile rewrite Action (#1083)
cad1249 is described below
commit cad124967ceb740178a3f65b75f19017494b430d
Author: Saisai Shao <[email protected]>
AuthorDate: Fri Jun 12 04:29:24 2020 +0800
Add DataFile rewrite Action (#1083)
---
.../java/org/apache/iceberg/BaseTableScan.java | 24 +-
.../org/apache/iceberg/util/TableScanUtil.java | 51 +++
.../java/org/apache/iceberg/actions/Actions.java | 4 +
.../org/apache/iceberg/actions/BaseAction.java | 13 +
.../iceberg/actions/RewriteDataFilesAction.java | 281 +++++++++++++++++
...ions.java => RewriteDataFilesActionResult.java} | 34 +-
.../iceberg/actions/RewriteManifestsAction.java | 11 +-
.../iceberg/spark/source/RowDataRewriter.java | 127 ++++++++
.../org/apache/iceberg/spark/source/Writer.java | 4 +-
.../actions/TestRewriteDataFilesAction.java | 344 +++++++++++++++++++++
10 files changed, 844 insertions(+), 49 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 27730e7..f36d7ab 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -26,7 +26,6 @@ import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
-import java.util.function.Function;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
import org.apache.iceberg.expressions.Binder;
@@ -35,11 +34,10 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.TableScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -266,14 +264,9 @@ abstract class BaseTableScan implements TableScan {
TableProperties.SPLIT_OPEN_FILE_COST,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
}
- Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(),
openFileCost);
-
- CloseableIterable<FileScanTask> splitFiles = splitFiles(splitSize);
- return CloseableIterable.transform(
- CloseableIterable.combine(
- new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback,
weightFunc, true),
- splitFiles),
- BaseCombinedScanTask::new);
+ CloseableIterable<FileScanTask> fileScanTasks = planFiles();
+ CloseableIterable<FileScanTask> splitFiles =
TableScanUtil.splitFiles(fileScanTasks, splitSize);
+ return TableScanUtil.planTasks(splitFiles, splitSize, lookback,
openFileCost);
}
@Override
@@ -304,15 +297,6 @@ abstract class BaseTableScan implements TableScan {
.toString();
}
- private CloseableIterable<FileScanTask> splitFiles(long splitSize) {
- CloseableIterable<FileScanTask> fileScanTasks = planFiles();
- Iterable<FileScanTask> splitTasks = FluentIterable
- .from(fileScanTasks)
- .transformAndConcat(input -> input.split(splitSize));
- // Capture manifests which can be closed after scan planning
- return CloseableIterable.combine(splitTasks, fileScanTasks);
- }
-
/**
* To be able to make refinements {@link #select(Collection)} and {@link
#caseSensitive(boolean)} in any order,
* we resolve the schema to be projected lazily here.
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
new file mode 100644
index 0000000..9e4d1dd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.function.Function;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+
+public class TableScanUtil {
+
+ private TableScanUtil() {}
+
+ public static CloseableIterable<FileScanTask>
splitFiles(CloseableIterable<FileScanTask> tasks, long splitSize) {
+ Iterable<FileScanTask> splitTasks = FluentIterable
+ .from(tasks)
+ .transformAndConcat(input -> input.split(splitSize));
+ // Capture manifests which can be closed after scan planning
+ return CloseableIterable.combine(splitTasks, tasks);
+ }
+
+ public static CloseableIterable<CombinedScanTask>
planTasks(CloseableIterable<FileScanTask> splitFiles,
+ long splitSize,
int lookback, long openFileCost) {
+ Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(),
openFileCost);
+
+ return CloseableIterable.transform(
+ CloseableIterable.combine(
+ new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback,
weightFunc, true),
+ splitFiles),
+ BaseCombinedScanTask::new);
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
b/spark/src/main/java/org/apache/iceberg/actions/Actions.java
index f2d386d..1728052 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/Actions.java
@@ -47,4 +47,8 @@ public class Actions {
public RewriteManifestsAction rewriteManifests() {
return new RewriteManifestsAction(spark, table);
}
+
+ public RewriteDataFilesAction rewriteDataFiles() {
+ return new RewriteDataFilesAction(spark, table);
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
index 67d9714..0a449e6 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.actions;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.spark.util.SerializableConfiguration;
abstract class BaseAction<R> implements Action<R> {
@@ -37,4 +40,14 @@ abstract class BaseAction<R> implements Action<R> {
return tableName + "." + type;
}
}
+
+ protected FileIO fileIO(Table table) {
+ if (table.io() instanceof HadoopFileIO) {
+ // we need to use Spark's SerializableConfiguration to avoid issues with
Kryo serialization
+ SerializableConfiguration conf = new
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
+ return new HadoopFileIO(conf::value);
+ } else {
+ return table.io();
+ }
+ }
}
diff --git
a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
new file mode 100644
index 0000000..e1a7584
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.source.RowDataRewriter;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RewriteDataFilesAction
+ extends BaseSnapshotUpdateAction<RewriteDataFilesAction,
RewriteDataFilesActionResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RewriteDataFilesAction.class);
+
+ private final JavaSparkContext sparkContext;
+ private final Table table;
+ private final FileIO fileIO;
+ private final EncryptionManager encryptionManager;
+ private final boolean caseSensitive;
+ private long targetSizeInBytes;
+ private int splitLookback;
+ private long splitOpenFileCost;
+
+ private PartitionSpec spec = null;
+ private Expression filter;
+
+ RewriteDataFilesAction(SparkSession spark, Table table) {
+ this.sparkContext = new JavaSparkContext(spark.sparkContext());
+ this.table = table;
+ this.spec = table.spec();
+ this.filter = Expressions.alwaysTrue();
+ this.caseSensitive =
Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
+
+ long splitSize = PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.SPLIT_SIZE,
+ TableProperties.SPLIT_SIZE_DEFAULT);
+ long targetFileSize = PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+ this.splitLookback = PropertyUtil.propertyAsInt(
+ table.properties(),
+ TableProperties.SPLIT_LOOKBACK,
+ TableProperties.SPLIT_LOOKBACK_DEFAULT);
+ this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.SPLIT_OPEN_FILE_COST,
+ TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+ this.fileIO = fileIO(table);
+ this.encryptionManager = table.encryption();
+ }
+
+ @Override
+ protected RewriteDataFilesAction self() {
+ return this;
+ }
+
+ @Override
+ protected Table table() {
+ return table;
+ }
+
+ /**
+ * Pass a PartitionSpec id to specify which PartitionSpec should be used in
DataFile rewrite
+ *
+ * @param specId PartitionSpec id to rewrite
+ * @return this for method chaining
+ */
+ public RewriteDataFilesAction outputSpecId(int specId) {
+ Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid
spec id %d", specId);
+ this.spec = table.specs().get(specId);
+ return this;
+ }
+
+ /**
+ * Specify the target rewrite data file size in bytes
+ *
+ * @param targetSize size in bytes of rewrite data file
+ * @return this for method chaining
+ */
+ public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
+ Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data
file size in bytes %d",
+ targetSize);
+ this.targetSizeInBytes = targetSize;
+ return this;
+ }
+
+ /**
+ * Specify the number of "bins" considered when trying to pack the next file
split into a task.
+ * Increasing this usually makes tasks a bit more even by considering more
ways to pack file regions into a single
+ * task with extra planning cost.
+ * <p>
+ * This configuration can reorder the incoming file regions, to preserve
order for lower/upper bounds in file
+ * metadata, user can use a lookback of 1.
+ *
+ * @param lookback number of "bins" considered when trying to pack the next
file split into a task.
+ * @return this for method chaining
+ */
+ public RewriteDataFilesAction splitLookback(int lookback) {
+ Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d",
lookback);
+ this.splitLookback = lookback;
+ return this;
+ }
+
+ /**
+ * Specify the minimum file size to count to pack into one "bin". If the
read file size is smaller than this specified
+ * threshold, Iceberg will use this value to do count.
+ * <p>
+ * this configuration controls the number of files to compact for each task,
small value would lead to a
+ * high compaction, the default value is 4MB.
+ *
+ * @param openFileCost minimum file size to count to pack into one "bin".
+ * @return this for method chaining
+ */
+ public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
+ Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost
%d", openFileCost);
+ this.splitOpenFileCost = openFileCost;
+ return this;
+ }
+
+ /**
+ * Pass a row Expression to filter DataFiles to be rewritten. Note that all
files that may contain data matching the
+ * filter may be rewritten.
+ *
+ * @param expr Expression to filter out DataFiles
+ * @return this for method chaining
+ */
+ public RewriteDataFilesAction filter(Expression expr) {
+ this.filter = Expressions.and(filter, expr);
+ return this;
+ }
+
+ @Override
+ public RewriteDataFilesActionResult execute() {
+ CloseableIterable<FileScanTask> fileScanTasks = null;
+ try {
+ fileScanTasks = table.newScan()
+ .caseSensitive(caseSensitive)
+ .ignoreResiduals()
+ .filter(filter)
+ .planFiles();
+ } finally {
+ try {
+ if (fileScanTasks != null) {
+ fileScanTasks.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close task iterable", ioe);
+ }
+ }
+
+ Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks =
groupTasksByPartition(fileScanTasks.iterator());
+ Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks =
groupedTasks.entrySet().stream()
+ .filter(kv -> kv.getValue().size() > 1)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ // Nothing to rewrite if there's only one DataFile in each partition.
+ if (filteredGroupedTasks.isEmpty()) {
+ return RewriteDataFilesActionResult.empty();
+ }
+
+ // Split and combine tasks under each partition
+ List<CombinedScanTask> combinedScanTasks =
filteredGroupedTasks.values().stream()
+ .map(scanTasks -> {
+ CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(
+ CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
+ return TableScanUtil.planTasks(splitTasks, targetSizeInBytes,
splitLookback, splitOpenFileCost);
+ })
+ .flatMap(Streams::stream)
+ .collect(Collectors.toList());
+
+ JavaRDD<CombinedScanTask> taskRDD =
sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
+
+ Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
+ Broadcast<EncryptionManager> encryption =
sparkContext.broadcast(encryptionManager);
+
+ RowDataRewriter rowDataRewriter =
+ new RowDataRewriter(table, spec, caseSensitive, io, encryption,
targetSizeInBytes);
+
+ List<DataFile> addedDataFiles =
rowDataRewriter.rewriteDataForTasks(taskRDD);
+ List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
+ .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+ .collect(Collectors.toList());
+ replaceDataFiles(currentDataFiles, addedDataFiles);
+
+ return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
+ }
+
+ private Map<StructLikeWrapper, Collection<FileScanTask>>
groupTasksByPartition(
+ CloseableIterator<FileScanTask> tasksIter) {
+ ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition =
Multimaps.newListMultimap(
+ Maps.newHashMap(), Lists::newArrayList);
+
+ try {
+ tasksIter.forEachRemaining(task -> {
+ StructLikeWrapper structLike =
StructLikeWrapper.wrap(task.file().partition());
+ tasksGroupedByPartition.put(structLike, task);
+ });
+
+ } finally {
+ try {
+ tasksIter.close();
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close task iterator", ioe);
+ }
+ }
+
+ return tasksGroupedByPartition.asMap();
+ }
+
+ private void replaceDataFiles(Iterable<DataFile> deletedDataFiles,
Iterable<DataFile> addedDataFiles) {
+ try {
+ RewriteFiles rewriteFiles = table.newRewrite();
+ rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles),
Sets.newHashSet(addedDataFiles));
+ commit(rewriteFiles);
+
+ } catch (Exception e) {
+ Tasks.foreach(Iterables.transform(addedDataFiles, f ->
f.path().toString()))
+ .noRetry()
+ .suppressFailureWhenFinished()
+ .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
+ .run(fileIO::deleteFile);
+
+ throw e;
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
similarity index 51%
copy from spark/src/main/java/org/apache/iceberg/actions/Actions.java
copy to
spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
index f2d386d..7313d9c 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/Actions.java
+++
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
@@ -19,32 +19,32 @@
package org.apache.iceberg.actions;
-import org.apache.iceberg.Table;
-import org.apache.spark.sql.SparkSession;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-public class Actions {
+public class RewriteDataFilesActionResult {
- private SparkSession spark;
- private Table table;
+ private static final RewriteDataFilesActionResult EMPTY =
+ new RewriteDataFilesActionResult(ImmutableList.of(), ImmutableList.of());
- private Actions(SparkSession spark, Table table) {
- this.spark = spark;
- this.table = table;
- }
+ private List<DataFile> deletedDataFiles;
+ private List<DataFile> addedDataFiles;
- public static Actions forTable(SparkSession spark, Table table) {
- return new Actions(spark, table);
+ public RewriteDataFilesActionResult(List<DataFile> deletedDataFiles,
List<DataFile> addedDataFiles) {
+ this.deletedDataFiles = deletedDataFiles;
+ this.addedDataFiles = addedDataFiles;
}
- public static Actions forTable(Table table) {
- return new Actions(SparkSession.active(), table);
+ static RewriteDataFilesActionResult empty() {
+ return EMPTY;
}
- public RemoveOrphanFilesAction removeOrphanFiles() {
- return new RemoveOrphanFilesAction(spark, table);
+ public List<DataFile> deletedDataFiles() {
+ return deletedDataFiles;
}
- public RewriteManifestsAction rewriteManifests() {
- return new RewriteManifestsAction(spark, table);
+ public List<DataFile> addedDataFiles() {
+ return addedDataFiles;
}
}
diff --git
a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
index 787852a..7a1c802 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -64,7 +63,6 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,14 +102,7 @@ public class RewriteManifestsAction
table.properties(),
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-
- if (table.io() instanceof HadoopFileIO) {
- // we need to use Spark's SerializableConfiguration to avoid issues with
Kryo serialization
- SerializableConfiguration conf = new
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
- this.fileIO = new HadoopFileIO(conf::value);
- } else {
- this.fileIO = table.io();
- }
+ this.fileIO = fileIO(table);
// default the staging location to the metadata location
TableOperations ops = ((HasTableOperations) table).operations();
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
new file mode 100644
index 0000000..0a56874
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RowDataRewriter implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Broadcast<FileIO> fileIO;
+ private final Broadcast<EncryptionManager> encryptionManager;
+ private final String tableSchema;
+ private final Writer.WriterFactory writerFactory;
+ private final boolean caseSensitive;
+
+ public RowDataRewriter(Table table, PartitionSpec spec, boolean
caseSensitive,
+ Broadcast<FileIO> fileIO,
Broadcast<EncryptionManager> encryptionManager,
+ long targetDataFileSizeInBytes) {
+ this.fileIO = fileIO;
+ this.encryptionManager = encryptionManager;
+
+ this.caseSensitive = caseSensitive;
+ this.tableSchema = SchemaParser.toJson(table.schema());
+
+ String formatString = table.properties().getOrDefault(
+ TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ FileFormat fileFormat =
FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ this.writerFactory = new Writer.WriterFactory(spec, fileFormat,
table.locationProvider(), table.properties(),
+ fileIO, encryptionManager, targetDataFileSizeInBytes, table.schema(),
SparkSchemaUtil.convert(table.schema()));
+ }
+
+ public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD)
{
+ JavaRDD<Writer.TaskCommit> taskCommitRDD =
taskRDD.map(this::rewriteDataForTask);
+
+ return taskCommitRDD.collect().stream()
+ .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
+ .collect(Collectors.toList());
+ }
+
+ private Writer.TaskCommit rewriteDataForTask(CombinedScanTask task) throws
Exception {
+ TaskContext context = TaskContext.get();
+
+ RowDataReader dataReader = new RowDataReader(task,
SchemaParser.fromJson(tableSchema),
+ SchemaParser.fromJson(tableSchema), fileIO.value(),
encryptionManager.value(), caseSensitive);
+
+ int partitionId = context.partitionId();
+ long taskId = context.taskAttemptId();
+ DataWriter<InternalRow> dataWriter =
writerFactory.createDataWriter(partitionId, taskId, 0);
+
+ try {
+ while (dataReader.next()) {
+ InternalRow row = dataReader.get();
+ dataWriter.write(row);
+ }
+
+ dataReader.close();
+ dataReader = null;
+ return (Writer.TaskCommit) dataWriter.commit();
+
+ } catch (Throwable originalThrowable) {
+ try {
+ LOG.error("Aborting task", originalThrowable);
+ context.markTaskFailed(originalThrowable);
+
+ LOG.error("Aborting commit for partition {} (task {}, attempt {},
stage {}.{})",
+ partitionId, taskId, context.attemptNumber(), context.stageId(),
context.stageAttemptNumber());
+ if (dataReader != null) {
+ dataReader.close();
+ }
+ dataWriter.abort();
+ LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage
{}.{})",
+ partitionId, taskId, context.taskAttemptId(), context.stageId(),
context.stageAttemptNumber());
+
+ } catch (Throwable inner) {
+ if (originalThrowable != inner) {
+ originalThrowable.addSuppressed(inner);
+ LOG.warn("Suppressing exception in catch: {}", inner.getMessage(),
inner);
+ }
+ }
+
+ if (originalThrowable instanceof Exception) {
+ throw originalThrowable;
+ } else {
+ throw new RuntimeException(originalThrowable);
+ }
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 298bcb9..ffe16cf 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -236,7 +236,7 @@ class Writer implements DataSourceWriter {
}
- private static class TaskCommit implements WriterCommitMessage {
+ static class TaskCommit implements WriterCommitMessage {
private final DataFile[] files;
TaskCommit() {
@@ -256,7 +256,7 @@ class Writer implements DataSourceWriter {
}
}
- private static class WriterFactory implements DataWriterFactory<InternalRow>
{
+ static class WriterFactory implements DataWriterFactory<InternalRow> {
private final PartitionSpec spec;
private final FileFormat format;
private final LocationProvider locations;
diff --git
a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
new file mode 100644
index 0000000..75a88fa
--- /dev/null
+++
b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestRewriteDataFilesAction {
+
+ private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get())
+ );
+
+ private static SparkSession spark;
+
+ @BeforeClass
+ public static void startSpark() {
+ TestRewriteDataFilesAction.spark = SparkSession.builder()
+ .master("local[2]")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestRewriteDataFilesAction.spark;
+ TestRewriteDataFilesAction.spark = null;
+ currentSpark.stop();
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private String tableLocation = null;
+
+ @Before
+ public void setupTableLocation() throws Exception {
+ File tableDir = temp.newFolder();
+ this.tableLocation = tableDir.toURI().toString();
+ }
+
+ @Test
+ public void testRewriteDataFilesEmptyTable() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+ Actions actions = Actions.forTable(table);
+
+ actions.rewriteDataFiles().execute();
+
+ Assert.assertNull("Table must stay empty", table.currentSnapshot());
+ }
+
+ @Test
+ public void testRewriteDataFilesUnpartitionedTable() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(
+ new ThreeColumnRecord(1, null, "AAAA"),
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")
+ );
+ writeRecords(records1);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")
+ );
+ writeRecords(records2);
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files before rewrite", 4,
dataFiles.size());
+
+ Actions actions = Actions.forTable(table);
+
+ RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+ Assert.assertEquals("Action should rewrite 4 data files", 4,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 1 data files before rewrite", 1,
dataFiles1.size());
+
+ List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records1);
+ expectedRecords.addAll(records2);
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2")
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesPartitionedTable() {
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+ .identity("c1")
+ .truncate("c2", 2)
+ .build();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+ );
+ writeRecords(records1);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+ );
+ writeRecords(records2);
+
+ List<ThreeColumnRecord> records3 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+ );
+ writeRecords(records3);
+
+ List<ThreeColumnRecord> records4 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+ );
+ writeRecords(records4);
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 8 data files before rewrite", 8,
dataFiles.size());
+
+ Actions actions = Actions.forTable(table);
+
+ RewriteDataFilesActionResult result = actions.rewriteDataFiles().execute();
+ Assert.assertEquals("Action should rewrite 8 data files", 8,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFiles().size());
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files before rewrite", 4,
dataFiles1.size());
+
+ List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records1);
+ expectedRecords.addAll(records2);
+ expectedRecords.addAll(records3);
+ expectedRecords.addAll(records4);
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithFilter() {
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
+ .identity("c1")
+ .truncate("c2", 2)
+ .build();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"),
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "CCCC")
+ );
+ writeRecords(records1);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "DDDD")
+ );
+ writeRecords(records2);
+
+ List<ThreeColumnRecord> records3 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "EEEE"),
+ new ThreeColumnRecord(2, "AAAAAAAAAA", "GGGG")
+ );
+ writeRecords(records3);
+
+ List<ThreeColumnRecord> records4 = Lists.newArrayList(
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "FFFF"),
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "HHHH")
+ );
+ writeRecords(records4);
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 8 data files before rewrite", 8,
dataFiles.size());
+
+ Actions actions = Actions.forTable(table);
+
+ RewriteDataFilesActionResult result = actions
+ .rewriteDataFiles()
+ .filter(Expressions.equal("c1", 1))
+ .filter(Expressions.startsWith("c2", "AA"))
+ .execute();
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 = table.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 7 data files before rewrite", 7,
dataFiles1.size());
+
+ List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records1);
+ expectedRecords.addAll(records2);
+ expectedRecords.addAll(records3);
+ expectedRecords.addAll(records4);
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords = resultDF.sort("c1", "c2", "c3")
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
+ @Test
+ public void testRewriteLargeTableHasResiduals() {
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100");
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ // all records belong to the same partition
+ List<ThreeColumnRecord> records = Lists.newArrayList();
+ for (int i = 0; i < 100; i++) {
+ records.add(new ThreeColumnRecord(i, String.valueOf(i), String.valueOf(i
% 4)));
+ }
+ Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+ writeDF(df);
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks = table.newScan()
+ .ignoreResiduals()
+ .filter(Expressions.equal("c3", "0"))
+ .planFiles();
+ for (FileScanTask task : tasks) {
+ Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), task.residual());
+ }
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
+
+ Actions actions = Actions.forTable(table);
+
+ RewriteDataFilesActionResult result = actions
+ .rewriteDataFiles()
+ .filter(Expressions.equal("c3", "0"))
+ .execute();
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ table.refresh();
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords = resultDF.sort("c1")
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Rows must match", records, actualRecords);
+ }
+
+ private void writeRecords(List<ThreeColumnRecord> records) {
+ Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+ writeDF(df);
+ }
+
+ private void writeDF(Dataset<Row> df) {
+ df.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+ }
+}