DanielMorales9 commented on code in PR #32879:
URL: https://github.com/apache/beam/pull/32879#discussion_r1816379196


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -97,36 +111,103 @@ private Catalog getCatalog() {
       return catalog;
     }
 
+    private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> 
fileWriteResults) {
+      int id = 
fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId();
+      for (FileWriteResult result : fileWriteResults) {
+        if (id != result.getSerializableDataFile().getPartitionSpecId()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @ProcessElement
     public void processElement(
         @Element KV<String, Iterable<FileWriteResult>> element,
         OutputReceiver<KV<String, SnapshotInfo>> out,
-        BoundedWindow window) {
+        BoundedWindow window)
+        throws IOException {
       String tableStringIdentifier = element.getKey();
       Iterable<FileWriteResult> fileWriteResults = element.getValue();
       if (!fileWriteResults.iterator().hasNext()) {
         return;
       }
 
       Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+
+      // vast majority of the time, we will simply append data files.
+      // in the rare case we get a batch that contains multiple partition 
specs, we will group
+      // data into manifest files and append.
+      // note: either way, we must use a single commit operation for atomicity.
+      if (containsMultiplePartitionSpecs(fileWriteResults)) {
+        appendManifestFiles(table, fileWriteResults);
+      } else {
+        appendDataFiles(table, fileWriteResults);

Review Comment:
   Can we return the AppendFiles object and commit?  



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -97,36 +111,103 @@ private Catalog getCatalog() {
       return catalog;
     }
 
+    private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> 
fileWriteResults) {
+      int id = 
fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId();
+      for (FileWriteResult result : fileWriteResults) {
+        if (id != result.getSerializableDataFile().getPartitionSpecId()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @ProcessElement
     public void processElement(
         @Element KV<String, Iterable<FileWriteResult>> element,
         OutputReceiver<KV<String, SnapshotInfo>> out,
-        BoundedWindow window) {
+        BoundedWindow window)
+        throws IOException {
       String tableStringIdentifier = element.getKey();
       Iterable<FileWriteResult> fileWriteResults = element.getValue();
       if (!fileWriteResults.iterator().hasNext()) {
         return;
       }
 
       Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+
+      // vast majority of the time, we will simply append data files.
+      // in the rare case we get a batch that contains multiple partition 
specs, we will group
+      // data into manifest files and append.
+      // note: either way, we must use a single commit operation for atomicity.
+      if (containsMultiplePartitionSpecs(fileWriteResults)) {
+        appendManifestFiles(table, fileWriteResults);
+      } else {
+        appendDataFiles(table, fileWriteResults);

Review Comment:
   Can we return the AppendFiles update and commit?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to