DanielMorales9 commented on code in PR #32879:
URL: https://github.com/apache/beam/pull/32879#discussion_r1816398086


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -97,36 +111,103 @@ private Catalog getCatalog() {
       return catalog;
     }
 
+    private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> 
fileWriteResults) {
+      int id = 
fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId();
+      for (FileWriteResult result : fileWriteResults) {
+        if (id != result.getSerializableDataFile().getPartitionSpecId()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @ProcessElement
     public void processElement(
         @Element KV<String, Iterable<FileWriteResult>> element,
         OutputReceiver<KV<String, SnapshotInfo>> out,
-        BoundedWindow window) {
+        BoundedWindow window)
+        throws IOException {
       String tableStringIdentifier = element.getKey();
       Iterable<FileWriteResult> fileWriteResults = element.getValue();
       if (!fileWriteResults.iterator().hasNext()) {
         return;
       }
 
       Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+
+      // vast majority of the time, we will simply append data files.
+      // in the rare case we get a batch that contains multiple partition 
specs, we will group
+      // data into manifest files and append.
+      // note: either way, we must use a single commit operation for atomicity.
+      if (containsMultiplePartitionSpecs(fileWriteResults)) {
+        appendManifestFiles(table, fileWriteResults);
+      } else {
+        appendDataFiles(table, fileWriteResults);
+      }
+
+      Snapshot snapshot = table.currentSnapshot();
+      LOG.info("Created new snapshot for table '{}': {}", 
tableStringIdentifier, snapshot);
+      snapshotsCreated.inc();
+      out.outputWithTimestamp(
+          KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), 
window.maxTimestamp());
+    }
+
+    // This works only when all files are using the same partition spec.
+    private void appendDataFiles(Table table, Iterable<FileWriteResult> 
fileWriteResults) {
       AppendFiles update = table.newAppend();
-      long numFiles = 0;
       for (FileWriteResult result : fileWriteResults) {
-        DataFile dataFile = result.getDataFile(table.spec());
+        DataFile dataFile = result.getDataFile(table.specs());
         update.appendFile(dataFile);
         committedDataFileByteSize.update(dataFile.fileSizeInBytes());
         committedDataFileRecordCount.update(dataFile.recordCount());
-        numFiles++;
       }
-      // this commit will create a ManifestFile. we don't need to manually 
create one.
       update.commit();
-      dataFilesCommitted.inc(numFiles);
+    }
+
+    // When a user updates their table partition spec during runtime, we can 
end up with
+    // a batch of files where some are written with the old spec and some are 
written with the new
+    // spec.
+    // A table commit is limited to a single partition spec.
+    // To handle this, we create a manifest file for each partition spec, and 
group data files
+    // accordingly.
+    // Afterward, we append all manifests using a single commit operation.
+    private void appendManifestFiles(Table table, Iterable<FileWriteResult> 
fileWriteResults)

Review Comment:
   this is a very long method 😲 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to