This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6580a0db15 Core: Create commit groups in commit service offer in
RewriteDataFilesCommitManager (#6539)
6580a0db15 is described below
commit 6580a0db156c2cc761e4f094409a66eaacc6c4a1
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Jan 12 14:49:13 2023 -0800
Core: Create commit groups in commit service offer in
RewriteDataFilesCommitManager (#6539)
Allows for concurrent commits during Partial Progress enabled
RewriteDataFiles Actions
---
.../actions/RewriteDataFilesCommitManager.java | 66 ++++++++++++++--------
1 file changed, 44 insertions(+), 22 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 416786d79b..11e1ecc5e2 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.actions;
import java.io.Closeable;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -136,6 +137,7 @@ public class RewriteDataFilesCommitManager {
public class CommitService implements Closeable {
private final ExecutorService committerService;
private final ConcurrentLinkedQueue<RewriteFileGroup> completedRewrites;
+ private final ConcurrentLinkedQueue<String> inProgressCommits;
private final List<RewriteFileGroup> committedRewrites;
private final int rewritesPerCommit;
private final AtomicBoolean running = new AtomicBoolean(false);
@@ -153,6 +155,7 @@ public class RewriteDataFilesCommitManager {
completedRewrites = Queues.newConcurrentLinkedQueue();
committedRewrites = Lists.newArrayList();
+ inProgressCommits = Queues.newConcurrentLinkedQueue();
}
/** Starts a single threaded executor service for handling file group
commits. */
@@ -163,9 +166,9 @@ public class RewriteDataFilesCommitManager {
// Partial progress commit service
committerService.execute(
() -> {
- while (running.get() || completedRewrites.size() > 0) {
+ while (running.get() || completedRewrites.size() > 0 ||
inProgressCommits.size() > 0) {
try {
- if (completedRewrites.size() == 0) {
+ if (completedRewrites.size() == 0 && inProgressCommits.size()
== 0) {
// Give other threads a chance to make progress
Thread.sleep(100);
}
@@ -174,32 +177,17 @@ public class RewriteDataFilesCommitManager {
throw new RuntimeException("Interrupted while processing
commits", e);
}
- // Either we have a full commit group, or we have completed
writing and need to commit
- // what is left over
- if (completedRewrites.size() >= rewritesPerCommit
- || (!running.get() && completedRewrites.size() > 0)) {
- Set<RewriteFileGroup> batch =
Sets.newHashSetWithExpectedSize(rewritesPerCommit);
- for (int i = 0; i < rewritesPerCommit &&
!completedRewrites.isEmpty(); i++) {
- batch.add(completedRewrites.poll());
- }
-
- try {
- commitOrClean(batch);
- committedRewrites.addAll(batch);
- } catch (Exception e) {
- LOG.error(
- "Failure during rewrite commit process, partial progress
enabled. Ignoring",
- e);
- }
+ // commit whatever is left once done with writing.
+ if (!running.get() && completedRewrites.size() > 0) {
+ commitReadyCommitGroups();
}
}
});
}
/**
- * Places a file group in the queue to be asynchronously committed either
when the queue has
- * enough elements to do a batch of size {@link #rewritesPerCommit} or the
service has been
- * closed.
+ * Places a file group in the queue and commits a batch of file groups if
{@link
+ * #rewritesPerCommit} number of file groups are present in the queue.
*
* @param group file group to eventually be committed
*/
@@ -208,6 +196,7 @@ public class RewriteDataFilesCommitManager {
Preconditions.checkState(
running.get(), "Cannot add rewrites to a service which has already
been closed");
completedRewrites.add(group);
+ commitReadyCommitGroups();
}
/** Returns all File groups which have been committed */
@@ -264,5 +253,38 @@ public class RewriteDataFilesCommitManager {
"File groups offered after service was closed, "
+ "they were not successfully committed.");
}
+
+ private void commitReadyCommitGroups() {
+ Set<RewriteFileGroup> batch = null;
+ if (canCreateCommitGroup()) {
+ synchronized (completedRewrites) {
+ if (canCreateCommitGroup()) {
+ batch = Sets.newHashSetWithExpectedSize(rewritesPerCommit);
+ for (int i = 0; i < rewritesPerCommit &&
!completedRewrites.isEmpty(); i++) {
+ batch.add(completedRewrites.poll());
+ }
+ }
+ }
+ }
+
+ if (batch != null) {
+ String inProgressCommitToken = UUID.randomUUID().toString();
+ inProgressCommits.add(inProgressCommitToken);
+ try {
+ commitOrClean(batch);
+ committedRewrites.addAll(batch);
+ } catch (Exception e) {
+ LOG.error("Failure during rewrite commit process, partial progress
enabled. Ignoring", e);
+ }
+ inProgressCommits.remove(inProgressCommitToken);
+ }
+ }
+
+ private boolean canCreateCommitGroup() {
+ // Either we have a full commit group, or we have completed writing and
need to commit
+ // what is left over
+ return (completedRewrites.size() >= rewritesPerCommit)
+ || (!running.get() && completedRewrites.size() > 0);
+ }
}
}