This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e69418ae8d Spark 3.5: Extend action for rewriting manifests to support
deletes (#9020)
e69418ae8d is described below
commit e69418ae8d6d041acae0af4fe59ea3c8b8f8705f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 17 18:34:25 2023 -0800
Spark 3.5: Extend action for rewriting manifests to support deletes (#9020)
---
.../{SparkDataFile.java => SparkContentFile.java} | 75 +++--
.../org/apache/iceberg/spark/SparkDataFile.java | 192 +------------
.../org/apache/iceberg/spark/SparkDeleteFile.java | 40 +++
.../spark/actions/RewriteManifestsSparkAction.java | 237 +++++++++++----
.../spark/actions/TestRewriteManifestsAction.java | 318 +++++++++++++++++++++
.../iceberg/spark/source/TestSparkDataFile.java | 194 ++++++++++---
6 files changed, 747 insertions(+), 309 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
similarity index 79%
copy from
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
copy to
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index d8a3d5714e..3dd8049c13 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -21,7 +21,9 @@ package org.apache.iceberg.spark;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -31,8 +33,9 @@ import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
-public class SparkDataFile implements DataFile {
+public abstract class SparkContentFile<F> implements ContentFile<F> {
+ private final int fileContentPosition;
private final int filePathPosition;
private final int fileFormatPosition;
private final int partitionPosition;
@@ -47,20 +50,16 @@ public class SparkDataFile implements DataFile {
private final int keyMetadataPosition;
private final int splitOffsetsPosition;
private final int sortOrderIdPosition;
+ private final int equalityIdsPosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
private final SparkStructLike wrappedPartition;
- private final StructLike partitionProjection;
+ private final StructLike projectedPartition;
private Row wrapped;
- public SparkDataFile(Types.StructType type, StructType sparkType) {
- this(type, null, sparkType);
- }
-
- public SparkDataFile(
- Types.StructType type, Types.StructType projectedType, StructType
sparkType) {
+ SparkContentFile(Types.StructType type, Types.StructType projectedType,
StructType sparkType) {
this.lowerBoundsType = type.fieldType(DataFile.LOWER_BOUNDS.name());
this.upperBoundsType = type.fieldType(DataFile.UPPER_BOUNDS.name());
this.keyMetadataType = type.fieldType(DataFile.KEY_METADATA.name());
@@ -71,20 +70,20 @@ public class SparkDataFile implements DataFile {
if (projectedType != null) {
Types.StructType projectedPartitionType =
projectedType.fieldType(DataFile.PARTITION_NAME).asStructType();
- this.partitionProjection =
- StructProjection.create(partitionType,
projectedPartitionType).wrap(wrappedPartition);
+ StructProjection partitionProjection =
+ StructProjection.create(partitionType, projectedPartitionType);
+ this.projectedPartition = partitionProjection.wrap(wrappedPartition);
} else {
- this.partitionProjection = wrappedPartition;
+ this.projectedPartition = wrappedPartition;
}
Map<String, Integer> positions = Maps.newHashMap();
- type.fields()
- .forEach(
- field -> {
- String fieldName = field.name();
- positions.put(fieldName, fieldPosition(fieldName, sparkType));
- });
+ for (Types.NestedField field : type.fields()) {
+ String fieldName = field.name();
+ positions.put(fieldName, fieldPosition(fieldName, sparkType));
+ }
+ this.fileContentPosition = positions.get(DataFile.CONTENT.name());
this.filePathPosition = positions.get(DataFile.FILE_PATH.name());
this.fileFormatPosition = positions.get(DataFile.FILE_FORMAT.name());
this.partitionPosition = positions.get(DataFile.PARTITION_NAME);
@@ -99,16 +98,19 @@ public class SparkDataFile implements DataFile {
this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+ this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
}
- public SparkDataFile wrap(Row row) {
+ public F wrap(Row row) {
this.wrapped = row;
if (wrappedPartition.size() > 0) {
- this.wrappedPartition.wrap(row.getAs(partitionPosition));
+ wrappedPartition.wrap(row.getAs(partitionPosition));
}
- return this;
+ return asFile();
}
+ protected abstract F asFile();
+
@Override
public Long pos() {
return null;
@@ -119,6 +121,14 @@ public class SparkDataFile implements DataFile {
return -1;
}
+ @Override
+ public FileContent content() {
+ if (wrapped.isNullAt(fileContentPosition)) {
+ return null;
+ }
+ return FileContent.values()[wrapped.getInt(fileContentPosition)];
+ }
+
@Override
public CharSequence path() {
return wrapped.getAs(filePathPosition);
@@ -131,7 +141,7 @@ public class SparkDataFile implements DataFile {
@Override
public StructLike partition() {
- return partitionProjection;
+ return projectedPartition;
}
@Override
@@ -156,16 +166,18 @@ public class SparkDataFile implements DataFile {
@Override
public Map<Integer, Long> nullValueCounts() {
- return wrapped.isNullAt(nullValueCountsPosition)
- ? null
- : wrapped.getJavaMap(nullValueCountsPosition);
+ if (wrapped.isNullAt(nullValueCountsPosition)) {
+ return null;
+ }
+ return wrapped.getJavaMap(nullValueCountsPosition);
}
@Override
public Map<Integer, Long> nanValueCounts() {
- return wrapped.isNullAt(nanValueCountsPosition)
- ? null
- : wrapped.getJavaMap(nanValueCountsPosition);
+ if (wrapped.isNullAt(nanValueCountsPosition)) {
+ return null;
+ }
+ return wrapped.getJavaMap(nanValueCountsPosition);
}
@Override
@@ -188,12 +200,12 @@ public class SparkDataFile implements DataFile {
}
@Override
- public DataFile copy() {
+ public F copy() {
throw new UnsupportedOperationException("Not implemented: copy");
}
@Override
- public DataFile copyWithoutStats() {
+ public F copyWithoutStats() {
throw new UnsupportedOperationException("Not implemented:
copyWithoutStats");
}
@@ -207,6 +219,11 @@ public class SparkDataFile implements DataFile {
return wrapped.getAs(sortOrderIdPosition);
}
+ @Override
+ public List<Integer> equalityFieldIds() {
+ return wrapped.isNullAt(equalityIdsPosition) ? null :
wrapped.getList(equalityIdsPosition);
+ }
+
private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index d8a3d5714e..543ebf3f9e 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -18,209 +18,29 @@
*/
package org.apache.iceberg.spark;
-import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
-public class SparkDataFile implements DataFile {
-
- private final int filePathPosition;
- private final int fileFormatPosition;
- private final int partitionPosition;
- private final int recordCountPosition;
- private final int fileSizeInBytesPosition;
- private final int columnSizesPosition;
- private final int valueCountsPosition;
- private final int nullValueCountsPosition;
- private final int nanValueCountsPosition;
- private final int lowerBoundsPosition;
- private final int upperBoundsPosition;
- private final int keyMetadataPosition;
- private final int splitOffsetsPosition;
- private final int sortOrderIdPosition;
- private final Type lowerBoundsType;
- private final Type upperBoundsType;
- private final Type keyMetadataType;
-
- private final SparkStructLike wrappedPartition;
- private final StructLike partitionProjection;
- private Row wrapped;
+public class SparkDataFile extends SparkContentFile<DataFile> implements
DataFile {
public SparkDataFile(Types.StructType type, StructType sparkType) {
- this(type, null, sparkType);
+ super(type, null, sparkType);
}
public SparkDataFile(
Types.StructType type, Types.StructType projectedType, StructType
sparkType) {
- this.lowerBoundsType = type.fieldType(DataFile.LOWER_BOUNDS.name());
- this.upperBoundsType = type.fieldType(DataFile.UPPER_BOUNDS.name());
- this.keyMetadataType = type.fieldType(DataFile.KEY_METADATA.name());
-
- Types.StructType partitionType =
type.fieldType(DataFile.PARTITION_NAME).asStructType();
- this.wrappedPartition = new SparkStructLike(partitionType);
-
- if (projectedType != null) {
- Types.StructType projectedPartitionType =
- projectedType.fieldType(DataFile.PARTITION_NAME).asStructType();
- this.partitionProjection =
- StructProjection.create(partitionType,
projectedPartitionType).wrap(wrappedPartition);
- } else {
- this.partitionProjection = wrappedPartition;
- }
-
- Map<String, Integer> positions = Maps.newHashMap();
- type.fields()
- .forEach(
- field -> {
- String fieldName = field.name();
- positions.put(fieldName, fieldPosition(fieldName, sparkType));
- });
-
- this.filePathPosition = positions.get(DataFile.FILE_PATH.name());
- this.fileFormatPosition = positions.get(DataFile.FILE_FORMAT.name());
- this.partitionPosition = positions.get(DataFile.PARTITION_NAME);
- this.recordCountPosition = positions.get(DataFile.RECORD_COUNT.name());
- this.fileSizeInBytesPosition = positions.get(DataFile.FILE_SIZE.name());
- this.columnSizesPosition = positions.get(DataFile.COLUMN_SIZES.name());
- this.valueCountsPosition = positions.get(DataFile.VALUE_COUNTS.name());
- this.nullValueCountsPosition =
positions.get(DataFile.NULL_VALUE_COUNTS.name());
- this.nanValueCountsPosition =
positions.get(DataFile.NAN_VALUE_COUNTS.name());
- this.lowerBoundsPosition = positions.get(DataFile.LOWER_BOUNDS.name());
- this.upperBoundsPosition = positions.get(DataFile.UPPER_BOUNDS.name());
- this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
- this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
- this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+ super(type, projectedType, sparkType);
}
- public SparkDataFile wrap(Row row) {
- this.wrapped = row;
- if (wrappedPartition.size() > 0) {
- this.wrappedPartition.wrap(row.getAs(partitionPosition));
- }
+ @Override
+ protected DataFile asFile() {
return this;
}
@Override
- public Long pos() {
+ public List<Integer> equalityFieldIds() {
return null;
}
-
- @Override
- public int specId() {
- return -1;
- }
-
- @Override
- public CharSequence path() {
- return wrapped.getAs(filePathPosition);
- }
-
- @Override
- public FileFormat format() {
- return FileFormat.fromString(wrapped.getString(fileFormatPosition));
- }
-
- @Override
- public StructLike partition() {
- return partitionProjection;
- }
-
- @Override
- public long recordCount() {
- return wrapped.getAs(recordCountPosition);
- }
-
- @Override
- public long fileSizeInBytes() {
- return wrapped.getAs(fileSizeInBytesPosition);
- }
-
- @Override
- public Map<Integer, Long> columnSizes() {
- return wrapped.isNullAt(columnSizesPosition) ? null :
wrapped.getJavaMap(columnSizesPosition);
- }
-
- @Override
- public Map<Integer, Long> valueCounts() {
- return wrapped.isNullAt(valueCountsPosition) ? null :
wrapped.getJavaMap(valueCountsPosition);
- }
-
- @Override
- public Map<Integer, Long> nullValueCounts() {
- return wrapped.isNullAt(nullValueCountsPosition)
- ? null
- : wrapped.getJavaMap(nullValueCountsPosition);
- }
-
- @Override
- public Map<Integer, Long> nanValueCounts() {
- return wrapped.isNullAt(nanValueCountsPosition)
- ? null
- : wrapped.getJavaMap(nanValueCountsPosition);
- }
-
- @Override
- public Map<Integer, ByteBuffer> lowerBounds() {
- Map<?, ?> lowerBounds =
- wrapped.isNullAt(lowerBoundsPosition) ? null :
wrapped.getJavaMap(lowerBoundsPosition);
- return convert(lowerBoundsType, lowerBounds);
- }
-
- @Override
- public Map<Integer, ByteBuffer> upperBounds() {
- Map<?, ?> upperBounds =
- wrapped.isNullAt(upperBoundsPosition) ? null :
wrapped.getJavaMap(upperBoundsPosition);
- return convert(upperBoundsType, upperBounds);
- }
-
- @Override
- public ByteBuffer keyMetadata() {
- return convert(keyMetadataType, wrapped.get(keyMetadataPosition));
- }
-
- @Override
- public DataFile copy() {
- throw new UnsupportedOperationException("Not implemented: copy");
- }
-
- @Override
- public DataFile copyWithoutStats() {
- throw new UnsupportedOperationException("Not implemented:
copyWithoutStats");
- }
-
- @Override
- public List<Long> splitOffsets() {
- return wrapped.isNullAt(splitOffsetsPosition) ? null :
wrapped.getList(splitOffsetsPosition);
- }
-
- @Override
- public Integer sortOrderId() {
- return wrapped.getAs(sortOrderIdPosition);
- }
-
- private int fieldPosition(String name, StructType sparkType) {
- try {
- return sparkType.fieldIndex(name);
- } catch (IllegalArgumentException e) {
- // the partition field is absent for unpartitioned tables
- if (name.equals(DataFile.PARTITION_NAME) && wrappedPartition.size() ==
0) {
- return -1;
- }
- throw e;
- }
- }
-
- @SuppressWarnings("unchecked")
- private <T> T convert(Type valueType, Object value) {
- return (T) SparkValueConverter.convert(valueType, value);
- }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java
new file mode 100644
index 0000000000..6250a16306
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.StructType;
+
+public class SparkDeleteFile extends SparkContentFile<DeleteFile> implements
DeleteFile {
+
+ public SparkDeleteFile(Types.StructType type, StructType sparkType) {
+ super(type, null, sparkType);
+ }
+
+ public SparkDeleteFile(
+ Types.StructType type, Types.StructType projectedType, StructType
sparkType) {
+ super(type, projectedType, sparkType);
+ }
+
+ @Override
+ protected DeleteFile asFile() {
+ return this;
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 0c08324a1a..5b1d616569 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -21,15 +21,19 @@ package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
import java.io.Serializable;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
@@ -50,7 +54,9 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkContentFile;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
@@ -85,8 +91,12 @@ public class RewriteManifestsSparkAction
public static final boolean USE_CACHING_DEFAULT = false;
private static final Logger LOG =
LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
+ private static final RewriteManifests.Result EMPTY_RESULT =
+ ImmutableRewriteManifests.Result.builder()
+ .rewrittenManifests(ImmutableList.of())
+ .addedManifests(ImmutableList.of())
+ .build();
- private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
private final int formatVersion;
private final long targetManifestSizeBytes;
@@ -98,7 +108,6 @@ public class RewriteManifestsSparkAction
RewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
- this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
this.table = table;
this.spec = table.spec();
this.targetManifestSizeBytes =
@@ -159,34 +168,49 @@ public class RewriteManifestsSparkAction
}
private RewriteManifests.Result doExecute() {
- List<ManifestFile> matchingManifests = findMatchingManifests();
+ List<ManifestFile> rewrittenManifests = Lists.newArrayList();
+ List<ManifestFile> addedManifests = Lists.newArrayList();
+
+ RewriteManifests.Result dataResult =
rewriteManifests(ManifestContent.DATA);
+ Iterables.addAll(rewrittenManifests, dataResult.rewrittenManifests());
+ Iterables.addAll(addedManifests, dataResult.addedManifests());
+
+ RewriteManifests.Result deletesResult =
rewriteManifests(ManifestContent.DELETES);
+ Iterables.addAll(rewrittenManifests, deletesResult.rewrittenManifests());
+ Iterables.addAll(addedManifests, deletesResult.addedManifests());
+
+ if (rewrittenManifests.isEmpty()) {
+ return EMPTY_RESULT;
+ }
+
+ replaceManifests(rewrittenManifests, addedManifests);
+
+ return ImmutableRewriteManifests.Result.builder()
+ .rewrittenManifests(rewrittenManifests)
+ .addedManifests(addedManifests)
+ .build();
+ }
+
+ private RewriteManifests.Result rewriteManifests(ManifestContent content) {
+ List<ManifestFile> matchingManifests = findMatchingManifests(content);
if (matchingManifests.isEmpty()) {
- return ImmutableRewriteManifests.Result.builder()
- .addedManifests(ImmutableList.of())
- .rewrittenManifests(ImmutableList.of())
- .build();
+ return EMPTY_RESULT;
}
int targetNumManifests =
targetNumManifests(totalSizeBytes(matchingManifests));
-
if (targetNumManifests == 1 && matchingManifests.size() == 1) {
- return ImmutableRewriteManifests.Result.builder()
- .addedManifests(ImmutableList.of())
- .rewrittenManifests(ImmutableList.of())
- .build();
+ return EMPTY_RESULT;
}
Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
List<ManifestFile> newManifests;
if (spec.isUnpartitioned()) {
- newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF,
targetNumManifests);
+ newManifests = writeUnpartitionedManifests(content, manifestEntryDF,
targetNumManifests);
} else {
- newManifests = writeManifestsForPartitionedTable(manifestEntryDF,
targetNumManifests);
+ newManifests = writePartitionedManifests(content, manifestEntryDF,
targetNumManifests);
}
- replaceManifests(matchingManifests, newManifests);
-
return ImmutableRewriteManifests.Result.builder()
.rewrittenManifests(matchingManifests)
.addedManifests(newManifests)
@@ -215,41 +239,45 @@ public class RewriteManifestsSparkAction
.select("snapshot_id", "sequence_number", "file_sequence_number",
"data_file");
}
- private List<ManifestFile> writeManifestsForUnpartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests) {
+ private List<ManifestFile> writeUnpartitionedManifests(
+ ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests)
{
- StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
- Types.StructType combinedPartitionType = Partitioning.partitionType(table);
- Types.StructType partitionType = spec.partitionType();
-
- return manifestEntryDF
- .repartition(numManifests)
- .mapPartitions(
- toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
- manifestEncoder)
- .collectAsList();
+ WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
manifestEntryDF.schema());
+ Dataset<Row> transformedManifestEntryDF =
manifestEntryDF.repartition(numManifests);
+ return writeFunc.apply(transformedManifestEntryDF).collectAsList();
}
- private List<ManifestFile> writeManifestsForPartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests) {
-
- StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
- Types.StructType combinedPartitionType = Partitioning.partitionType(table);
- Types.StructType partitionType = spec.partitionType();
+ private List<ManifestFile> writePartitionedManifests(
+ ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests)
{
return withReusableDS(
manifestEntryDF,
df -> {
+ WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
df.schema());
Column partitionColumn = df.col("data_file.partition");
- return df.repartitionByRange(numManifests, partitionColumn)
- .sortWithinPartitions(partitionColumn)
- .mapPartitions(
- toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
- manifestEncoder)
- .collectAsList();
+ Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn,
numManifests);
+ return writeFunc.apply(transformedDF).collectAsList();
});
}
+ private WriteManifests<?> newWriteManifestsFunc(ManifestContent content,
StructType sparkType) {
+ ManifestWriterFactory writers = manifestWriters();
+
+ StructType sparkFileType = (StructType)
sparkType.apply("data_file").dataType();
+ Types.StructType combinedFileType =
DataFile.getType(Partitioning.partitionType(table));
+ Types.StructType fileType = DataFile.getType(spec.partitionType());
+
+ if (content == ManifestContent.DATA) {
+ return new WriteDataManifests(writers, combinedFileType, fileType,
sparkFileType);
+ } else {
+ return new WriteDeleteManifests(writers, combinedFileType, fileType,
sparkFileType);
+ }
+ }
+
+ private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int
numPartitions) {
+ return df.repartitionByRange(numPartitions, col).sortWithinPartitions(col);
+ }
+
private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func)
{
boolean useCaching =
PropertyUtil.propertyAsBoolean(options(), USE_CACHING,
USE_CACHING_DEFAULT);
@@ -264,18 +292,31 @@ public class RewriteManifestsSparkAction
}
}
- private List<ManifestFile> findMatchingManifests() {
+ private List<ManifestFile> findMatchingManifests(ManifestContent content) {
Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
return ImmutableList.of();
}
- return currentSnapshot.dataManifests(table.io()).stream()
+ List<ManifestFile> manifests = loadManifests(content, currentSnapshot);
+
+ return manifests.stream()
.filter(manifest -> manifest.partitionSpecId() == spec.specId() &&
predicate.test(manifest))
.collect(Collectors.toList());
}
+ private List<ManifestFile> loadManifests(ManifestContent content, Snapshot
snapshot) {
+ switch (content) {
+ case DATA:
+ return snapshot.dataManifests(table.io());
+ case DELETES:
+ return snapshot.deleteManifests(table.io());
+ default:
+ throw new IllegalArgumentException("Unknown manifest content: " +
content);
+ }
+ }
+
private int targetNumManifests(long totalSizeBytes) {
return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) /
targetManifestSizeBytes);
}
@@ -339,18 +380,82 @@ public class RewriteManifestsSparkAction
(long) (1.2 * targetManifestSizeBytes));
}
- private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- ManifestWriterFactory writers,
- Types.StructType combinedPartitionType,
- Types.StructType partitionType,
- StructType sparkType) {
+ private static class WriteDataManifests extends WriteManifests<DataFile> {
+
+ WriteDataManifests(
+ ManifestWriterFactory manifestWriters,
+ Types.StructType combinedPartitionType,
+ Types.StructType partitionType,
+ StructType sparkFileType) {
+ super(manifestWriters, combinedPartitionType, partitionType,
sparkFileType);
+ }
+
+ @Override
+ protected SparkDataFile newFileWrapper() {
+ return new SparkDataFile(combinedFileType(), fileType(),
sparkFileType());
+ }
+
+ @Override
+ protected RollingManifestWriter<DataFile> newManifestWriter() {
+ return writers().newRollingManifestWriter();
+ }
+ }
+
+ private static class WriteDeleteManifests extends WriteManifests<DeleteFile>
{
+
+ WriteDeleteManifests(
+ ManifestWriterFactory manifestWriters,
+ Types.StructType combinedFileType,
+ Types.StructType fileType,
+ StructType sparkFileType) {
+ super(manifestWriters, combinedFileType, fileType, sparkFileType);
+ }
+
+ @Override
+ protected SparkDeleteFile newFileWrapper() {
+ return new SparkDeleteFile(combinedFileType(), fileType(),
sparkFileType());
+ }
+
+ @Override
+ protected RollingManifestWriter<DeleteFile> newManifestWriter() {
+ return writers().newRollingDeleteManifestWriter();
+ }
+ }
+
+ private abstract static class WriteManifests<F extends ContentFile<F>>
+ implements MapPartitionsFunction<Row, ManifestFile> {
+
+ private static final Encoder<ManifestFile> MANIFEST_ENCODER =
+ Encoders.javaSerialization(ManifestFile.class);
+
+ private final ManifestWriterFactory writers;
+ private final Types.StructType combinedFileType;
+ private final Types.StructType fileType;
+ private final StructType sparkFileType;
+
+ WriteManifests(
+ ManifestWriterFactory writers,
+ Types.StructType combinedFileType,
+ Types.StructType fileType,
+ StructType sparkFileType) {
+ this.writers = writers;
+ this.combinedFileType = combinedFileType;
+ this.fileType = fileType;
+ this.sparkFileType = sparkFileType;
+ }
+
+ protected abstract SparkContentFile<F> newFileWrapper();
- return rows -> {
- Types.StructType combinedFileType =
DataFile.getType(combinedPartitionType);
- Types.StructType manifestFileType = DataFile.getType(partitionType);
- SparkDataFile wrapper = new SparkDataFile(combinedFileType,
manifestFileType, sparkType);
+ protected abstract RollingManifestWriter<F> newManifestWriter();
- RollingManifestWriter<DataFile> writer =
writers.newRollingManifestWriter();
+ public Dataset<ManifestFile> apply(Dataset<Row> input) {
+ return input.mapPartitions(this, MANIFEST_ENCODER);
+ }
+
+ @Override
+ public Iterator<ManifestFile> call(Iterator<Row> rows) throws Exception {
+ SparkContentFile<F> fileWrapper = newFileWrapper();
+ RollingManifestWriter<F> writer = newManifestWriter();
try {
while (rows.hasNext()) {
@@ -359,14 +464,30 @@ public class RewriteManifestsSparkAction
long sequenceNumber = row.getLong(1);
Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
Row file = row.getStruct(3);
- writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber,
fileSequenceNumber);
+ writer.existing(fileWrapper.wrap(file), snapshotId, sequenceNumber,
fileSequenceNumber);
}
} finally {
writer.close();
}
return writer.toManifestFiles().iterator();
- };
+ }
+
+ protected ManifestWriterFactory writers() {
+ return writers;
+ }
+
+ protected Types.StructType combinedFileType() {
+ return combinedFileType;
+ }
+
+ protected Types.StructType fileType() {
+ return fileType;
+ }
+
+ protected StructType sparkFileType() {
+ return sparkFileType;
+ }
}
private static class ManifestWriterFactory implements Serializable {
@@ -397,6 +518,14 @@ public class RewriteManifestsSparkAction
return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null);
}
+ public RollingManifestWriter<DeleteFile> newRollingDeleteManifestWriter() {
+ return new RollingManifestWriter<>(this::newDeleteManifestWriter,
maxManifestSizeBytes);
+ }
+
+ private ManifestWriter<DeleteFile> newDeleteManifestWriter() {
+ return ManifestFiles.writeDeleteManifest(formatVersion, spec(),
newOutputFile(), null);
+ }
+
private PartitionSpec spec() {
return table().specs().get(specId);
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index d3932c2e82..e7326c73e8 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -38,10 +38,15 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
@@ -49,6 +54,9 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.OutputFile;
@@ -61,6 +69,8 @@ import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -649,6 +659,255 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
}
+ @Test
+ public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws
IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+ options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ // commit data records
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(
+ new ThreeColumnRecord(1, null, "AAAA"),
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"));
+ writeRecords(records);
+
+ // commit a position delete file to remove records where c1 = 1 OR c1 = 2
+ List<Pair<CharSequence, Long>> posDeletes = generatePosDeletes("c1 = 1 OR
c1 = 2");
+ Pair<DeleteFile, CharSequenceSet> posDeleteWriteResult =
writePosDeletes(table, posDeletes);
+ table
+ .newRowDelta()
+ .addDeletes(posDeleteWriteResult.first())
+ .validateDataFilesExist(posDeleteWriteResult.second())
+ .commit();
+
+ // commit an equality delete file to remove all records where c1 = 3
+ DeleteFile eqDeleteFile = writeEqDeletes(table, "c1", 3);
+ table.newRowDelta().addDeletes(eqDeleteFile).commit();
+
+ // the current snapshot should contain 1 data manifest and 2 delete
manifests
+ List<ManifestFile> originalManifests =
table.currentSnapshot().allManifests(table.io());
+ assertThat(originalManifests).hasSize(3);
+
+ SparkActions actions = SparkActions.get();
+
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
+
+ // the original delete manifests must be combined
+ assertThat(result.rewrittenManifests())
+ .hasSize(2)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertThat(result.addedManifests())
+ .hasSize(1)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertManifestsLocation(result.addedManifests());
+
+ // the new delete manifest must only contain files with status EXISTING
+ ManifestFile deleteManifest =
+
Iterables.getOnlyElement(table.currentSnapshot().deleteManifests(table.io()));
+ assertThat(deleteManifest.existingFilesCount()).isEqualTo(2);
+ assertThat(deleteManifest.hasAddedFiles()).isFalse();
+ assertThat(deleteManifest.hasDeletedFiles()).isFalse();
+
+ // the preserved data manifest must only contain files with status ADDED
+ ManifestFile dataManifest =
+
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+ assertThat(dataManifest.hasExistingFiles()).isFalse();
+ assertThat(dataManifest.hasAddedFiles()).isTrue();
+ assertThat(dataManifest.hasDeletedFiles()).isFalse();
+
+ // the table must produce expected records after the rewrite
+ List<ThreeColumnRecord> expectedRecords =
+ Lists.newArrayList(new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"));
+ assertThat(actualRecords()).isEqualTo(expectedRecords);
+ }
+
+ @Test
+ public void testRewriteSmallDeleteManifestsPartitionedTable() throws
IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+ options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
+ options.put(TableProperties.MANIFEST_MERGE_ENABLED, "false");
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ // commit data records
+ List<ThreeColumnRecord> records =
+ Lists.newArrayList(
+ new ThreeColumnRecord(1, null, "AAAA"),
+ new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"),
+ new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"),
+ new ThreeColumnRecord(5, "EEEEEEEEEE", "EEEE"));
+ writeRecords(records);
+
+ // commit the first position delete file to remove records where c1 = 1
+ List<Pair<CharSequence, Long>> posDeletes1 = generatePosDeletes("c1 = 1");
+ Pair<DeleteFile, CharSequenceSet> posDeleteWriteResult1 =
+ writePosDeletes(table, TestHelpers.Row.of("AAAA"), posDeletes1);
+ table
+ .newRowDelta()
+ .addDeletes(posDeleteWriteResult1.first())
+ .validateDataFilesExist(posDeleteWriteResult1.second())
+ .commit();
+
+ // commit the second position delete file to remove records where c1 = 2
+ List<Pair<CharSequence, Long>> posDeletes2 = generatePosDeletes("c1 = 2");
+ Pair<DeleteFile, CharSequenceSet> positionDeleteWriteResult2 =
+ writePosDeletes(table, TestHelpers.Row.of("BBBB"), posDeletes2);
+ table
+ .newRowDelta()
+ .addDeletes(positionDeleteWriteResult2.first())
+ .validateDataFilesExist(positionDeleteWriteResult2.second())
+ .commit();
+
+ // commit the first equality delete file to remove records where c1 = 3
+ DeleteFile eqDeleteFile1 = writeEqDeletes(table,
TestHelpers.Row.of("CCCC"), "c1", 3);
+ table.newRowDelta().addDeletes(eqDeleteFile1).commit();
+
+ // commit the second equality delete file to remove records where c1 = 4
+ DeleteFile eqDeleteFile2 = writeEqDeletes(table,
TestHelpers.Row.of("DDDD"), "c1", 4);
+ table.newRowDelta().addDeletes(eqDeleteFile2).commit();
+
+ // the table must have 1 data manifest and 4 delete manifests
+ List<ManifestFile> originalManifests =
table.currentSnapshot().allManifests(table.io());
+ assertThat(originalManifests).hasSize(5);
+
+ // set the target manifest size to have 2 manifests with 2 entries in each
after the rewrite
+ List<ManifestFile> originalDeleteManifests =
+ table.currentSnapshot().deleteManifests(table.io());
+ long manifestEntrySizeBytes =
computeManifestEntrySizeBytes(originalDeleteManifests);
+ long targetManifestSizeBytes = (long) (1.05 * 2 * manifestEntrySizeBytes);
+
+ table
+ .updateProperties()
+ .set(TableProperties.MANIFEST_TARGET_SIZE_BYTES,
String.valueOf(targetManifestSizeBytes))
+ .commit();
+
+ SparkActions actions = SparkActions.get();
+
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> manifest.content() ==
ManifestContent.DELETES)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
+
+ // the original 4 delete manifests must be replaced with 2 new delete
manifests
+ assertThat(result.rewrittenManifests())
+ .hasSize(4)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertThat(result.addedManifests())
+ .hasSize(2)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertManifestsLocation(result.addedManifests());
+
+ List<ManifestFile> deleteManifests =
table.currentSnapshot().deleteManifests(table.io());
+ assertThat(deleteManifests).hasSize(2);
+
+ // the first new delete manifest must only contain files with status
EXISTING
+ ManifestFile deleteManifest1 = deleteManifests.get(0);
+ assertThat(deleteManifest1.existingFilesCount()).isEqualTo(2);
+ assertThat(deleteManifest1.hasAddedFiles()).isFalse();
+ assertThat(deleteManifest1.hasDeletedFiles()).isFalse();
+
+ // the second new delete manifest must only contain files with status
EXISTING
+ ManifestFile deleteManifest2 = deleteManifests.get(1);
+ assertThat(deleteManifest2.existingFilesCount()).isEqualTo(2);
+ assertThat(deleteManifest2.hasAddedFiles()).isFalse();
+ assertThat(deleteManifest2.hasDeletedFiles()).isFalse();
+
+ // the table must produce expected records after the rewrite
+ List<ThreeColumnRecord> expectedRecords =
+ Lists.newArrayList(new ThreeColumnRecord(5, "EEEEEEEEEE", "EEEE"));
+ assertThat(actualRecords()).isEqualTo(expectedRecords);
+ }
+
+ @Test
+ public void testRewriteLargeDeleteManifestsPartitionedTable() throws
IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+ options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ // generate enough delete files to have a reasonably sized manifest
+ List<DeleteFile> deleteFiles = Lists.newArrayList();
+ for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+ DeleteFile deleteFile = newDeleteFile(table, "c3=" + fileOrdinal);
+ deleteFiles.add(deleteFile);
+ }
+
+ // commit delete files
+ RowDelta rowDelta = table.newRowDelta();
+ for (DeleteFile deleteFile : deleteFiles) {
+ rowDelta.addDeletes(deleteFile);
+ }
+ rowDelta.commit();
+
+ // the current snapshot should contain only 1 delete manifest
+ List<ManifestFile> originalDeleteManifests =
+ table.currentSnapshot().deleteManifests(table.io());
+ ManifestFile originalDeleteManifest =
Iterables.getOnlyElement(originalDeleteManifests);
+
+ // set the target manifest size to a small value to force splitting
records into multiple files
+ table
+ .updateProperties()
+ .set(
+ TableProperties.MANIFEST_TARGET_SIZE_BYTES,
+ String.valueOf(originalDeleteManifest.length() / 2))
+ .commit();
+
+ SparkActions actions = SparkActions.get();
+
+ String stagingLocation = temp.newFolder().toString();
+
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .stagingLocation(stagingLocation)
+ .execute();
+
+ // the action must rewrite the original delete manifest and add at least 2
new ones
+ assertThat(result.rewrittenManifests())
+ .hasSize(1)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertThat(result.addedManifests())
+ .hasSizeGreaterThanOrEqualTo(2)
+ .allMatch(m -> m.content() == ManifestContent.DELETES);
+ assertManifestsLocation(result.addedManifests(), stagingLocation);
+
+ // the current snapshot must return the correct number of delete manifests
+ List<ManifestFile> deleteManifests =
table.currentSnapshot().deleteManifests(table.io());
+ assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(2);
+ }
+
+ private List<ThreeColumnRecord> actualRecords() {
+ return spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation)
+ .as(Encoders.bean(ThreeColumnRecord.class))
+ .sort("c1", "c2", "c3")
+ .collectAsList();
+ }
+
private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);
@@ -721,4 +980,63 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.withFileSizeInBytes(10)
.withRecordCount(1);
}
+
+ private DeleteFile newDeleteFile(Table table, String partitionPath) {
+ return FileMetadata.deleteFileBuilder(table.spec())
+ .ofPositionDeletes()
+ .withPath("/path/to/pos-deletes-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(5)
+ .withPartitionPath(partitionPath)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private List<Pair<CharSequence, Long>> generatePosDeletes(String predicate) {
+ List<Row> rows =
+ spark
+ .read()
+ .format("iceberg")
+ .load(tableLocation)
+ .selectExpr("_file", "_pos")
+ .where(predicate)
+ .collectAsList();
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+
+ for (Row row : rows) {
+ deletes.add(Pair.of(row.getString(0), row.getLong(1)));
+ }
+
+ return deletes;
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+ Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+ return writePosDeletes(table, null, deletes);
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+ Table table, StructLike partition, List<Pair<CharSequence, Long>>
deletes)
+ throws IOException {
+ OutputFile outputFile = Files.localOutput(temp.newFile());
+ return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes);
+ }
+
+ private DeleteFile writeEqDeletes(Table table, String key, Object... values)
throws IOException {
+ return writeEqDeletes(table, null, key, values);
+ }
+
+ private DeleteFile writeEqDeletes(Table table, StructLike partition, String
key, Object... values)
+ throws IOException {
+ List<Record> deletes = Lists.newArrayList();
+ Schema deleteSchema = table.schema().select(key);
+ Record delete = GenericRecord.create(deleteSchema);
+
+ for (Object value : values) {
+ deletes.add(delete.copy(key, value));
+ }
+
+ OutputFile outputFile = Files.localOutput(temp.newFile());
+ return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes,
deleteSchema);
+ }
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index 1f8227c94e..b894d32326 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -20,32 +20,46 @@ package org.apache.iceberg.spark.source;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -57,7 +71,6 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -133,13 +146,13 @@ public class TestSparkDataFile {
public void testValueConversion() throws IOException {
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
Maps.newHashMap(), tableLocation);
- checkSparkDataFile(table);
+ checkSparkContentFiles(table);
}
@Test
public void testValueConversionPartitionedTable() throws IOException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
- checkSparkDataFile(table);
+ checkSparkContentFiles(table);
}
@Test
@@ -147,10 +160,10 @@ public class TestSparkDataFile {
Map<String, String> props = Maps.newHashMap();
props.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none");
Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation);
- checkSparkDataFile(table);
+ checkSparkContentFiles(table);
}
- private void checkSparkDataFile(Table table) throws IOException {
+ private void checkSparkContentFiles(Table table) throws IOException {
Iterable<InternalRow> rows = RandomData.generateSpark(table.schema(), 200,
0);
JavaRDD<InternalRow> rdd =
sparkContext.parallelize(Lists.newArrayList(rows));
Dataset<Row> df =
@@ -161,66 +174,167 @@ public class TestSparkDataFile {
table.refresh();
+ PartitionSpec dataFilesSpec = table.spec();
+
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
+ assertThat(manifests).hasSize(1);
List<DataFile> dataFiles = Lists.newArrayList();
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifests.get(0), table.io())) {
for (DataFile dataFile : reader) {
- checkDataFile(dataFile.copy(),
DataFiles.builder(table.spec()).copy(dataFile).build());
+ checkDataFile(dataFile.copy(),
DataFiles.builder(dataFilesSpec).copy(dataFile).build());
dataFiles.add(dataFile.copy());
}
}
- Dataset<Row> dataFileDF =
spark.read().format("iceberg").load(tableLocation + "#files");
+ UpdatePartitionSpec updateSpec = table.updateSpec();
+ for (PartitionField field : dataFilesSpec.fields()) {
+ updateSpec.removeField(field.name());
+ }
+ updateSpec.commit();
- // reorder columns to test arbitrary projections
- List<Column> columns =
-
Arrays.stream(dataFileDF.columns()).map(ColumnName::new).collect(Collectors.toList());
- Collections.shuffle(columns);
+ List<DeleteFile> positionDeleteFiles = Lists.newArrayList();
+ List<DeleteFile> equalityDeleteFiles = Lists.newArrayList();
+
+ RowDelta rowDelta = table.newRowDelta();
+
+ for (DataFile dataFile : dataFiles) {
+ DeleteFile positionDeleteFile = createPositionDeleteFile(table,
dataFile);
+ positionDeleteFiles.add(positionDeleteFile);
+ rowDelta.addDeletes(positionDeleteFile);
+ }
+
+ DeleteFile equalityDeleteFile1 = createEqualityDeleteFile(table);
+ equalityDeleteFiles.add(equalityDeleteFile1);
+ rowDelta.addDeletes(equalityDeleteFile1);
- List<Row> sparkDataFiles =
- dataFileDF.select(Iterables.toArray(columns,
Column.class)).collectAsList();
+ DeleteFile equalityDeleteFile2 = createEqualityDeleteFile(table);
+ equalityDeleteFiles.add(equalityDeleteFile2);
+ rowDelta.addDeletes(equalityDeleteFile2);
- Assert.assertEquals(
- "The number of files should match", dataFiles.size(),
sparkDataFiles.size());
+ rowDelta.commit();
- Types.StructType dataFileType =
DataFile.getType(table.spec().partitionType());
+ Dataset<Row> dataFileDF =
spark.read().format("iceberg").load(tableLocation + "#data_files");
+ List<Row> sparkDataFiles = shuffleColumns(dataFileDF).collectAsList();
+ assertThat(sparkDataFiles).hasSameSizeAs(dataFiles);
+
+ Types.StructType dataFileType =
DataFile.getType(dataFilesSpec.partitionType());
StructType sparkDataFileType = sparkDataFiles.get(0).schema();
- SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkDataFileType);
+ SparkDataFile dataFileWrapper = new SparkDataFile(dataFileType,
sparkDataFileType);
for (int i = 0; i < dataFiles.size(); i++) {
- checkDataFile(dataFiles.get(i), wrapper.wrap(sparkDataFiles.get(i)));
+ checkDataFile(dataFiles.get(i),
dataFileWrapper.wrap(sparkDataFiles.get(i)));
+ }
+
+ Dataset<Row> positionDeleteFileDF =
+ spark.read().format("iceberg").load(tableLocation +
"#delete_files").where("content = 1");
+ List<Row> sparkPositionDeleteFiles =
shuffleColumns(positionDeleteFileDF).collectAsList();
+ assertThat(sparkPositionDeleteFiles).hasSameSizeAs(positionDeleteFiles);
+
+ Types.StructType positionDeleteFileType =
DataFile.getType(dataFilesSpec.partitionType());
+ StructType sparkPositionDeleteFileType =
sparkPositionDeleteFiles.get(0).schema();
+ SparkDeleteFile positionDeleteFileWrapper =
+ new SparkDeleteFile(positionDeleteFileType,
sparkPositionDeleteFileType);
+
+ for (int i = 0; i < positionDeleteFiles.size(); i++) {
+ checkDeleteFile(
+ positionDeleteFiles.get(i),
+ positionDeleteFileWrapper.wrap(sparkPositionDeleteFiles.get(i)));
+ }
+
+ Dataset<Row> equalityDeleteFileDF =
+ spark.read().format("iceberg").load(tableLocation +
"#delete_files").where("content = 2");
+ List<Row> sparkEqualityDeleteFiles =
shuffleColumns(equalityDeleteFileDF).collectAsList();
+ assertThat(sparkEqualityDeleteFiles).hasSameSizeAs(equalityDeleteFiles);
+
+ Types.StructType equalityDeleteFileType =
DataFile.getType(table.spec().partitionType());
+ StructType sparkEqualityDeleteFileType =
sparkEqualityDeleteFiles.get(0).schema();
+ SparkDeleteFile equalityDeleteFileWrapper =
+ new SparkDeleteFile(equalityDeleteFileType,
sparkEqualityDeleteFileType);
+
+ for (int i = 0; i < equalityDeleteFiles.size(); i++) {
+ checkDeleteFile(
+ equalityDeleteFiles.get(i),
+ equalityDeleteFileWrapper.wrap(sparkEqualityDeleteFiles.get(i)));
}
}
+ private Dataset<Row> shuffleColumns(Dataset<Row> df) {
+ List<Column> columns =
+
Arrays.stream(df.columns()).map(ColumnName::new).collect(Collectors.toList());
+ Collections.shuffle(columns);
+ return df.select(columns.toArray(new Column[0]));
+ }
+
private void checkDataFile(DataFile expected, DataFile actual) {
- Assert.assertEquals("Path must match", expected.path(), actual.path());
- Assert.assertEquals("Format must match", expected.format(),
actual.format());
- Assert.assertEquals("Record count must match", expected.recordCount(),
actual.recordCount());
- Assert.assertEquals("Size must match", expected.fileSizeInBytes(),
actual.fileSizeInBytes());
- Assert.assertEquals(
- "Record value counts must match", expected.valueCounts(),
actual.valueCounts());
- Assert.assertEquals(
- "Record null value counts must match",
- expected.nullValueCounts(),
- actual.nullValueCounts());
- Assert.assertEquals(
- "Record nan value counts must match", expected.nanValueCounts(),
actual.nanValueCounts());
- Assert.assertEquals("Lower bounds must match", expected.lowerBounds(),
actual.lowerBounds());
- Assert.assertEquals("Upper bounds must match", expected.upperBounds(),
actual.upperBounds());
- Assert.assertEquals("Key metadata must match", expected.keyMetadata(),
actual.keyMetadata());
- Assert.assertEquals("Split offsets must match", expected.splitOffsets(),
actual.splitOffsets());
- Assert.assertEquals("Sort order id must match", expected.sortOrderId(),
actual.sortOrderId());
+ assertThat(expected.equalityFieldIds()).isNull();
+ assertThat(actual.equalityFieldIds()).isNull();
+ checkContentFile(expected, actual);
+ checkStructLike(expected.partition(), actual.partition());
+ }
+ private void checkDeleteFile(DeleteFile expected, DeleteFile actual) {
+
assertThat(expected.equalityFieldIds()).isEqualTo(actual.equalityFieldIds());
+ checkContentFile(expected, actual);
checkStructLike(expected.partition(), actual.partition());
}
+ private void checkContentFile(ContentFile<?> expected, ContentFile<?>
actual) {
+ assertThat(actual.content()).isEqualTo(expected.content());
+ assertThat(actual.path()).isEqualTo(expected.path());
+ assertThat(actual.format()).isEqualTo(expected.format());
+ assertThat(actual.recordCount()).isEqualTo(expected.recordCount());
+ assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes());
+ assertThat(actual.valueCounts()).isEqualTo(expected.valueCounts());
+ assertThat(actual.nullValueCounts()).isEqualTo(expected.nullValueCounts());
+ assertThat(actual.nanValueCounts()).isEqualTo(expected.nanValueCounts());
+ assertThat(actual.lowerBounds()).isEqualTo(expected.lowerBounds());
+ assertThat(actual.upperBounds()).isEqualTo(expected.upperBounds());
+ assertThat(actual.keyMetadata()).isEqualTo(expected.keyMetadata());
+ assertThat(actual.splitOffsets()).isEqualTo(expected.splitOffsets());
+ assertThat(actual.sortOrderId()).isEqualTo(expected.sortOrderId());
+ }
+
private void checkStructLike(StructLike expected, StructLike actual) {
- Assert.assertEquals("Struct size should match", expected.size(),
actual.size());
+ assertThat(actual.size()).isEqualTo(expected.size());
for (int i = 0; i < expected.size(); i++) {
- Assert.assertEquals(
- "Struct values must match", expected.get(i, Object.class),
actual.get(i, Object.class));
+ assertThat(actual.get(i, Object.class)).isEqualTo(expected.get(i,
Object.class));
}
}
+
+ private DeleteFile createPositionDeleteFile(Table table, DataFile dataFile) {
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ return FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPath("/path/to/pos-deletes-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(dataFile.fileSizeInBytes() / 4)
+ .withPartition(dataFile.partition())
+ .withRecordCount(2)
+ .withMetrics(
+ new Metrics(
+ 2L,
+ null, // no column sizes
+ null, // no value counts
+ null, // no null counts
+ null, // no NaN counts
+ ImmutableMap.of(
+ MetadataColumns.DELETE_FILE_PATH.fieldId(),
+ Conversions.toByteBuffer(Types.StringType.get(),
dataFile.path())),
+ ImmutableMap.of(
+ MetadataColumns.DELETE_FILE_PATH.fieldId(),
+ Conversions.toByteBuffer(Types.StringType.get(),
dataFile.path()))))
+ .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(35))
+ .build();
+ }
+
+ private DeleteFile createEqualityDeleteFile(Table table) {
+ return FileMetadata.deleteFileBuilder(table.spec())
+ .ofEqualityDeletes(3, 4)
+ .withPath("/path/to/eq-deletes-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(250)
+ .withRecordCount(1)
+ .withSortOrder(SortOrder.unsorted())
+ .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(35))
+ .build();
+ }
}