This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6580a0db15 Core: Create commit groups in commit service offer in 
RewriteDataFilesCommitManager (#6539)
6580a0db15 is described below

commit 6580a0db156c2cc761e4f094409a66eaacc6c4a1
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Jan 12 14:49:13 2023 -0800

    Core: Create commit groups in commit service offer in 
RewriteDataFilesCommitManager (#6539)
    
    Allows for concurrent commits during Partial Progress enabled 
RewriteDataFiles Actions
---
 .../actions/RewriteDataFilesCommitManager.java     | 66 ++++++++++++++--------
 1 file changed, 44 insertions(+), 22 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 416786d79b..11e1ecc5e2 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.actions;
 import java.io.Closeable;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -136,6 +137,7 @@ public class RewriteDataFilesCommitManager {
   public class CommitService implements Closeable {
     private final ExecutorService committerService;
     private final ConcurrentLinkedQueue<RewriteFileGroup> completedRewrites;
+    private final ConcurrentLinkedQueue<String> inProgressCommits;
     private final List<RewriteFileGroup> committedRewrites;
     private final int rewritesPerCommit;
     private final AtomicBoolean running = new AtomicBoolean(false);
@@ -153,6 +155,7 @@ public class RewriteDataFilesCommitManager {
 
       completedRewrites = Queues.newConcurrentLinkedQueue();
       committedRewrites = Lists.newArrayList();
+      inProgressCommits = Queues.newConcurrentLinkedQueue();
     }
 
     /** Starts a single threaded executor service for handling file group 
commits. */
@@ -163,9 +166,9 @@ public class RewriteDataFilesCommitManager {
       // Partial progress commit service
       committerService.execute(
           () -> {
-            while (running.get() || completedRewrites.size() > 0) {
+            while (running.get() || completedRewrites.size() > 0 || 
inProgressCommits.size() > 0) {
               try {
-                if (completedRewrites.size() == 0) {
+                if (completedRewrites.size() == 0 && inProgressCommits.size() 
== 0) {
                   // Give other threads a chance to make progress
                   Thread.sleep(100);
                 }
@@ -174,32 +177,17 @@ public class RewriteDataFilesCommitManager {
                 throw new RuntimeException("Interrupted while processing 
commits", e);
               }
 
-              // Either we have a full commit group, or we have completed 
writing and need to commit
-              // what is left over
-              if (completedRewrites.size() >= rewritesPerCommit
-                  || (!running.get() && completedRewrites.size() > 0)) {
-                Set<RewriteFileGroup> batch = 
Sets.newHashSetWithExpectedSize(rewritesPerCommit);
-                for (int i = 0; i < rewritesPerCommit && 
!completedRewrites.isEmpty(); i++) {
-                  batch.add(completedRewrites.poll());
-                }
-
-                try {
-                  commitOrClean(batch);
-                  committedRewrites.addAll(batch);
-                } catch (Exception e) {
-                  LOG.error(
-                      "Failure during rewrite commit process, partial progress 
enabled. Ignoring",
-                      e);
-                }
+              // commit whatever is left once done with writing.
+              if (!running.get() && completedRewrites.size() > 0) {
+                commitReadyCommitGroups();
               }
             }
           });
     }
 
     /**
-     * Places a file group in the queue to be asynchronously committed either 
when the queue has
-     * enough elements to do a batch of size {@link #rewritesPerCommit} or the 
service has been
-     * closed.
+     * Places a file group in the queue and commits a batch of file groups if 
{@link
+     * #rewritesPerCommit} number of file groups are present in the queue.
      *
      * @param group file group to eventually be committed
      */
@@ -208,6 +196,7 @@ public class RewriteDataFilesCommitManager {
       Preconditions.checkState(
           running.get(), "Cannot add rewrites to a service which has already 
been closed");
       completedRewrites.add(group);
+      commitReadyCommitGroups();
     }
 
     /** Returns all File groups which have been committed */
@@ -264,5 +253,38 @@ public class RewriteDataFilesCommitManager {
           "File groups offered after service was closed, "
               + "they were not successfully committed.");
     }
+
+    private void commitReadyCommitGroups() {
+      Set<RewriteFileGroup> batch = null;
+      if (canCreateCommitGroup()) {
+        synchronized (completedRewrites) {
+          if (canCreateCommitGroup()) {
+            batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
+            for (int i = 0; i < rewritesPerCommit && 
!completedRewrites.isEmpty(); i++) {
+              batch.add(completedRewrites.poll());
+            }
+          }
+        }
+      }
+
+      if (batch != null) {
+        String inProgressCommitToken = UUID.randomUUID().toString();
+        inProgressCommits.add(inProgressCommitToken);
+        try {
+          commitOrClean(batch);
+          committedRewrites.addAll(batch);
+        } catch (Exception e) {
+          LOG.error("Failure during rewrite commit process, partial progress 
enabled. Ignoring", e);
+        }
+        inProgressCommits.remove(inProgressCommitToken);
+      }
+    }
+
+    private boolean canCreateCommitGroup() {
+      // Either we have a full commit group, or we have completed writing and 
need to commit
+      // what is left over
+      return (completedRewrites.size() >= rewritesPerCommit)
+          || (!running.get() && completedRewrites.size() > 0);
+    }
   }
 }

Reply via email to