This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5f970a8396 Core: Support appending files with different specs (#9860)
5f970a8396 is described below
commit 5f970a839674f68a4b2f07cdca012ab4a15566c0
Author: Farooq Qaiser <[email protected]>
AuthorDate: Thu Jul 18 19:27:06 2024 -0400
Core: Support appending files with different specs (#9860)
---
.../apache/iceberg/MergingSnapshotProducer.java | 102 ++++++++++++---------
.../java/org/apache/iceberg/TestMergeAppend.java | 88 ++++++++++++++++++
2 files changed, 145 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 1a4560416d..b4c0567ab7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -42,6 +43,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
@@ -79,7 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
// update data
- private final List<DataFile> newDataFiles = Lists.newArrayList();
+ private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec =
Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private Long newDataFilesDataSequenceNumber;
@@ -89,10 +91,9 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private final SnapshotSummary.Builder addedFilesSummary =
SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary =
SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
- private PartitionSpec dataSpec;
// cache new data manifests after writing
- private List<ManifestFile> cachedNewDataManifests = null;
+ private final List<ManifestFile> cachedNewDataManifests =
Lists.newLinkedList();
private boolean hasNewDataFiles = false;
// cache new manifests for delete files
@@ -105,7 +106,6 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
super(ops);
this.tableName = tableName;
this.ops = ops;
- this.dataSpec = null;
long targetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
@@ -141,10 +141,18 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
protected PartitionSpec dataSpec() {
+ Set<PartitionSpec> specs = dataSpecs();
Preconditions.checkState(
- dataSpec != null, "Cannot determine partition spec: no data files have
been added");
- // the spec is set when the write is started
- return dataSpec;
+ specs.size() == 1,
+ "Cannot return a single partition spec: data files with different
partition specs have been added");
+ return specs.iterator().next();
+ }
+
+ protected Set<PartitionSpec> dataSpecs() {
+ Set<PartitionSpec> specs = newDataFilesBySpec.keySet();
+ Preconditions.checkState(
+ !specs.isEmpty(), "Cannot determine partition specs: no data files
have been added");
+ return ImmutableSet.copyOf(specs);
}
protected Expression rowFilter() {
@@ -152,7 +160,12 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
protected List<DataFile> addedDataFiles() {
- return ImmutableList.copyOf(newDataFiles);
+ return ImmutableList.copyOf(
+
newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList()));
+ }
+
+ protected Map<PartitionSpec, List<DataFile>> addedDataFilesBySpec() {
+ return ImmutableMap.copyOf(newDataFilesBySpec);
}
protected void failAnyDelete() {
@@ -212,7 +225,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
protected boolean addsDataFiles() {
- return !newDataFiles.isEmpty();
+ return !newDataFilesBySpec.isEmpty();
}
protected boolean addsDeleteFiles() {
@@ -223,9 +236,17 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
- setDataSpec(file);
- addedFilesSummary.addedFile(dataSpec(), file);
+ PartitionSpec fileSpec = ops.current().spec(file.specId());
+ Preconditions.checkArgument(
+ fileSpec != null,
+ "Cannot find partition spec %s for data file: %s",
+ file.specId(),
+ file.path());
+
+ addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
+ List<DataFile> newDataFiles =
+ newDataFilesBySpec.computeIfAbsent(fileSpec, ignored ->
Lists.newArrayList());
newDataFiles.add(file);
}
}
@@ -255,17 +276,6 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
}
- private void setDataSpec(DataFile file) {
- PartitionSpec fileSpec = ops.current().spec(file.specId());
- Preconditions.checkNotNull(
- fileSpec, "Cannot find partition spec for data file: %s", file.path());
- if (dataSpec == null) {
- dataSpec = fileSpec;
- } else if (dataSpec.specId() != file.specId()) {
- throw new ValidationException("Invalid data file, expected spec id: %d",
dataSpec.specId());
- }
- }
-
/** Add all files in a manifest to the new snapshot. */
protected void add(ManifestFile manifest) {
Preconditions.checkArgument(
@@ -885,7 +895,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
- if (cachedNewDataManifests != null) {
+ if (!cachedNewDataManifests.isEmpty()) {
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (!committed.contains(manifest)) {
@@ -895,7 +905,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
if (hasDeletes) {
- this.cachedNewDataManifests = null;
+ this.cachedNewDataManifests.clear();
}
}
@@ -941,7 +951,7 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
- if (!newDataFiles.isEmpty()) {
+ if (!newDataFilesBySpec.isEmpty()) {
List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
newManifests = Iterables.concat(dataFileManifests, appendManifests,
rewrittenAppendManifests);
} else {
@@ -954,29 +964,31 @@ abstract class MergingSnapshotProducer<ThisT> extends
SnapshotProducer<ThisT> {
}
private List<ManifestFile> newDataFilesAsManifests() {
- if (hasNewDataFiles && cachedNewDataManifests != null) {
+ if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
- cachedNewDataManifests = null;
+ cachedNewDataManifests.clear();
}
- if (cachedNewDataManifests == null) {
- try {
- RollingManifestWriter<DataFile> writer =
newRollingManifestWriter(dataSpec());
- try {
- if (newDataFilesDataSequenceNumber == null) {
- newDataFiles.forEach(writer::add);
- } else {
- newDataFiles.forEach(f -> writer.add(f,
newDataFilesDataSequenceNumber));
- }
- } finally {
- writer.close();
- }
-
- this.cachedNewDataManifests = writer.toManifestFiles();
- this.hasNewDataFiles = false;
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close manifest writer");
- }
+ if (cachedNewDataManifests.isEmpty()) {
+ newDataFilesBySpec.forEach(
+ (dataSpec, newDataFiles) -> {
+ try {
+ RollingManifestWriter<DataFile> writer =
newRollingManifestWriter(dataSpec);
+ try {
+ if (newDataFilesDataSequenceNumber == null) {
+ newDataFiles.forEach(writer::add);
+ } else {
+ newDataFiles.forEach(f -> writer.add(f,
newDataFilesDataSequenceNumber));
+ }
+ } finally {
+ writer.close();
+ }
+ this.cachedNewDataManifests.addAll(writer.toManifestFiles());
+ this.hasNewDataFiles = false;
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close manifest
writer");
+ }
+ });
}
return cachedNewDataManifests;
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 4719923e72..abfcb31833 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -27,12 +27,14 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
@@ -90,6 +92,92 @@ public class TestMergeAppend extends TestBase {
statuses(Status.ADDED, Status.ADDED));
}
+ @TestTemplate
+ public void testEmptyTableAppendFilesWithDifferentSpecs() {
+ assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+ TableMetadata base = readMetadata();
+ assertThat(base.currentSnapshot()).as("Should not have a current
snapshot").isNull();
+ assertThat(base.lastSequenceNumber()).as("Last sequence number should be
0").isEqualTo(0);
+
+ table.updateSpec().addField("id").commit();
+ PartitionSpec newSpec = table.spec();
+
+ assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
+
+ DataFile fileNewSpec =
+ DataFiles.builder(newSpec)
+ .withPath("/path/to/data-b.parquet")
+ .withPartitionPath("data_bucket=0/id=0")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ Snapshot committedSnapshot =
+ commit(table,
table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec), branch);
+
+ assertThat(committedSnapshot).as("Should create a snapshot").isNotNull();
+ V1Assert.assertEquals(
+ "Last sequence number should be 0", 0,
table.ops().current().lastSequenceNumber());
+ V2Assert.assertEquals(
+ "Last sequence number should be 1", 1,
table.ops().current().lastSequenceNumber());
+
+ assertThat(committedSnapshot.allManifests(table.io()))
+ .as("Should create 2 manifests for initial write, 1 manifest per spec")
+ .hasSize(2);
+
+ long snapshotId = committedSnapshot.snapshotId();
+
+ ImmutableMap<Integer, DataFile> expectedFileBySpec =
+ ImmutableMap.of(SPEC.specId(), FILE_A, newSpec.specId(), fileNewSpec);
+
+ expectedFileBySpec.forEach(
+ (specId, expectedDataFile) -> {
+ ManifestFile manifestFileForSpecId =
+ committedSnapshot.allManifests(table.io()).stream()
+ .filter(m -> Objects.equals(m.partitionSpecId(), specId))
+ .findAny()
+ .get();
+
+ validateManifest(
+ manifestFileForSpecId,
+ dataSeqs(1L),
+ fileSeqs(1L),
+ ids(snapshotId),
+ files(expectedDataFile),
+ statuses(Status.ADDED));
+ });
+ }
+
+ @TestTemplate
+ public void
testDataSpecThrowsExceptionIfDataFilesWithDifferentSpecsAreAdded() {
+ assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+ TableMetadata base = readMetadata();
+ assertThat(base.currentSnapshot()).as("Should not have a current
snapshot").isNull();
+ assertThat(base.lastSequenceNumber()).as("Last sequence number should be
0").isEqualTo(0);
+
+ table.updateSpec().addField("id").commit();
+ PartitionSpec newSpec = table.spec();
+
+ assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
+
+ DataFile fileNewSpec =
+ DataFiles.builder(newSpec)
+ .withPath("/path/to/data-b.parquet")
+ .withPartitionPath("data_bucket=0/id=0")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ MergeAppend mergeAppend =
+ (MergeAppend)
table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec);
+ assertThatThrownBy(mergeAppend::dataSpec)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Cannot return a single partition spec: data files with different
partition specs have been added");
+ }
+
@TestTemplate
public void testEmptyTableAppendManifest() throws IOException {
assertThat(listManifestFiles()).isEmpty();