This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f970a8396 Core: Support appending files with different specs (#9860)
5f970a8396 is described below

commit 5f970a839674f68a4b2f07cdca012ab4a15566c0
Author: Farooq Qaiser <[email protected]>
AuthorDate: Thu Jul 18 19:27:06 2024 -0400

    Core: Support appending files with different specs (#9860)
---
 .../apache/iceberg/MergingSnapshotProducer.java    | 102 ++++++++++++---------
 .../java/org/apache/iceberg/TestMergeAppend.java   |  88 ++++++++++++++++++
 2 files changed, 145 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 1a4560416d..b4c0567ab7 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -42,6 +43,7 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Predicate;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
@@ -79,7 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private final ManifestFilterManager<DeleteFile> deleteFilterManager;
 
   // update data
-  private final List<DataFile> newDataFiles = Lists.newArrayList();
+  private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = 
Maps.newHashMap();
   private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
   private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
   private Long newDataFilesDataSequenceNumber;
@@ -89,10 +91,9 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   private final SnapshotSummary.Builder addedFilesSummary = 
SnapshotSummary.builder();
   private final SnapshotSummary.Builder appendedManifestsSummary = 
SnapshotSummary.builder();
   private Expression deleteExpression = Expressions.alwaysFalse();
-  private PartitionSpec dataSpec;
 
   // cache new data manifests after writing
-  private List<ManifestFile> cachedNewDataManifests = null;
+  private final List<ManifestFile> cachedNewDataManifests = 
Lists.newLinkedList();
   private boolean hasNewDataFiles = false;
 
   // cache new manifests for delete files
@@ -105,7 +106,6 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     super(ops);
     this.tableName = tableName;
     this.ops = ops;
-    this.dataSpec = null;
     long targetSizeBytes =
         ops.current()
             .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, 
MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
@@ -141,10 +141,18 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   protected PartitionSpec dataSpec() {
+    Set<PartitionSpec> specs = dataSpecs();
     Preconditions.checkState(
-        dataSpec != null, "Cannot determine partition spec: no data files have 
been added");
-    // the spec is set when the write is started
-    return dataSpec;
+        specs.size() == 1,
+        "Cannot return a single partition spec: data files with different 
partition specs have been added");
+    return specs.iterator().next();
+  }
+
+  protected Set<PartitionSpec> dataSpecs() {
+    Set<PartitionSpec> specs = newDataFilesBySpec.keySet();
+    Preconditions.checkState(
+        !specs.isEmpty(), "Cannot determine partition specs: no data files 
have been added");
+    return ImmutableSet.copyOf(specs);
   }
 
   protected Expression rowFilter() {
@@ -152,7 +160,12 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   protected List<DataFile> addedDataFiles() {
-    return ImmutableList.copyOf(newDataFiles);
+    return ImmutableList.copyOf(
+        
newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList()));
+  }
+
+  protected Map<PartitionSpec, List<DataFile>> addedDataFilesBySpec() {
+    return ImmutableMap.copyOf(newDataFilesBySpec);
   }
 
   protected void failAnyDelete() {
@@ -212,7 +225,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   protected boolean addsDataFiles() {
-    return !newDataFiles.isEmpty();
+    return !newDataFilesBySpec.isEmpty();
   }
 
   protected boolean addsDeleteFiles() {
@@ -223,9 +236,17 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   protected void add(DataFile file) {
     Preconditions.checkNotNull(file, "Invalid data file: null");
     if (newDataFilePaths.add(file.path())) {
-      setDataSpec(file);
-      addedFilesSummary.addedFile(dataSpec(), file);
+      PartitionSpec fileSpec = ops.current().spec(file.specId());
+      Preconditions.checkArgument(
+          fileSpec != null,
+          "Cannot find partition spec %s for data file: %s",
+          file.specId(),
+          file.path());
+
+      addedFilesSummary.addedFile(fileSpec, file);
       hasNewDataFiles = true;
+      List<DataFile> newDataFiles =
+          newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> 
Lists.newArrayList());
       newDataFiles.add(file);
     }
   }
@@ -255,17 +276,6 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
     }
   }
 
-  private void setDataSpec(DataFile file) {
-    PartitionSpec fileSpec = ops.current().spec(file.specId());
-    Preconditions.checkNotNull(
-        fileSpec, "Cannot find partition spec for data file: %s", file.path());
-    if (dataSpec == null) {
-      dataSpec = fileSpec;
-    } else if (dataSpec.specId() != file.specId()) {
-      throw new ValidationException("Invalid data file, expected spec id: %d", 
dataSpec.specId());
-    }
-  }
-
   /** Add all files in a manifest to the new snapshot. */
   protected void add(ManifestFile manifest) {
     Preconditions.checkArgument(
@@ -885,7 +895,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   private void cleanUncommittedAppends(Set<ManifestFile> committed) {
-    if (cachedNewDataManifests != null) {
+    if (!cachedNewDataManifests.isEmpty()) {
       boolean hasDeletes = false;
       for (ManifestFile manifest : cachedNewDataManifests) {
         if (!committed.contains(manifest)) {
@@ -895,7 +905,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
       }
 
       if (hasDeletes) {
-        this.cachedNewDataManifests = null;
+        this.cachedNewDataManifests.clear();
       }
     }
 
@@ -941,7 +951,7 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
 
   private Iterable<ManifestFile> prepareNewDataManifests() {
     Iterable<ManifestFile> newManifests;
-    if (!newDataFiles.isEmpty()) {
+    if (!newDataFilesBySpec.isEmpty()) {
       List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
       newManifests = Iterables.concat(dataFileManifests, appendManifests, 
rewrittenAppendManifests);
     } else {
@@ -954,29 +964,31 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
   }
 
   private List<ManifestFile> newDataFilesAsManifests() {
-    if (hasNewDataFiles && cachedNewDataManifests != null) {
+    if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
       cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
-      cachedNewDataManifests = null;
+      cachedNewDataManifests.clear();
     }
 
-    if (cachedNewDataManifests == null) {
-      try {
-        RollingManifestWriter<DataFile> writer = 
newRollingManifestWriter(dataSpec());
-        try {
-          if (newDataFilesDataSequenceNumber == null) {
-            newDataFiles.forEach(writer::add);
-          } else {
-            newDataFiles.forEach(f -> writer.add(f, 
newDataFilesDataSequenceNumber));
-          }
-        } finally {
-          writer.close();
-        }
-
-        this.cachedNewDataManifests = writer.toManifestFiles();
-        this.hasNewDataFiles = false;
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to close manifest writer");
-      }
+    if (cachedNewDataManifests.isEmpty()) {
+      newDataFilesBySpec.forEach(
+          (dataSpec, newDataFiles) -> {
+            try {
+              RollingManifestWriter<DataFile> writer = 
newRollingManifestWriter(dataSpec);
+              try {
+                if (newDataFilesDataSequenceNumber == null) {
+                  newDataFiles.forEach(writer::add);
+                } else {
+                  newDataFiles.forEach(f -> writer.add(f, 
newDataFilesDataSequenceNumber));
+                }
+              } finally {
+                writer.close();
+              }
+              this.cachedNewDataManifests.addAll(writer.toManifestFiles());
+              this.hasNewDataFiles = false;
+            } catch (IOException e) {
+              throw new RuntimeIOException(e, "Failed to close manifest 
writer");
+            }
+          });
     }
 
     return cachedNewDataManifests;
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 4719923e72..abfcb31833 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -27,12 +27,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
@@ -90,6 +92,92 @@ public class TestMergeAppend extends TestBase {
         statuses(Status.ADDED, Status.ADDED));
   }
 
+  @TestTemplate
+  public void testEmptyTableAppendFilesWithDifferentSpecs() {
+    assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).as("Should not have a current 
snapshot").isNull();
+    assertThat(base.lastSequenceNumber()).as("Last sequence number should be 
0").isEqualTo(0);
+
+    table.updateSpec().addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
+
+    DataFile fileNewSpec =
+        DataFiles.builder(newSpec)
+            .withPath("/path/to/data-b.parquet")
+            .withPartitionPath("data_bucket=0/id=0")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    Snapshot committedSnapshot =
+        commit(table, 
table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec), branch);
+
+    assertThat(committedSnapshot).as("Should create a snapshot").isNotNull();
+    V1Assert.assertEquals(
+        "Last sequence number should be 0", 0, 
table.ops().current().lastSequenceNumber());
+    V2Assert.assertEquals(
+        "Last sequence number should be 1", 1, 
table.ops().current().lastSequenceNumber());
+
+    assertThat(committedSnapshot.allManifests(table.io()))
+        .as("Should create 2 manifests for initial write, 1 manifest per spec")
+        .hasSize(2);
+
+    long snapshotId = committedSnapshot.snapshotId();
+
+    ImmutableMap<Integer, DataFile> expectedFileBySpec =
+        ImmutableMap.of(SPEC.specId(), FILE_A, newSpec.specId(), fileNewSpec);
+
+    expectedFileBySpec.forEach(
+        (specId, expectedDataFile) -> {
+          ManifestFile manifestFileForSpecId =
+              committedSnapshot.allManifests(table.io()).stream()
+                  .filter(m -> Objects.equals(m.partitionSpecId(), specId))
+                  .findAny()
+                  .get();
+
+          validateManifest(
+              manifestFileForSpecId,
+              dataSeqs(1L),
+              fileSeqs(1L),
+              ids(snapshotId),
+              files(expectedDataFile),
+              statuses(Status.ADDED));
+        });
+  }
+
+  @TestTemplate
+  public void 
testDataSpecThrowsExceptionIfDataFilesWithDifferentSpecsAreAdded() {
+    assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).as("Should not have a current 
snapshot").isNull();
+    assertThat(base.lastSequenceNumber()).as("Last sequence number should be 
0").isEqualTo(0);
+
+    table.updateSpec().addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
+
+    DataFile fileNewSpec =
+        DataFiles.builder(newSpec)
+            .withPath("/path/to/data-b.parquet")
+            .withPartitionPath("data_bucket=0/id=0")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    MergeAppend mergeAppend =
+        (MergeAppend) 
table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec);
+    assertThatThrownBy(mergeAppend::dataSpec)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(
+            "Cannot return a single partition spec: data files with different 
partition specs have been added");
+  }
+
   @TestTemplate
   public void testEmptyTableAppendManifest() throws IOException {
     assertThat(listManifestFiles()).isEmpty();

Reply via email to