This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 847b4f9980 Core: Abort file groups should be under same lock as
committerService (#7933) (#8060)
847b4f9980 is described below
commit 847b4f99808823b36c8743eae72b5d4b7301a6a7
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Jul 15 02:19:55 2023 +0800
Core: Abort file groups should be under same lock as committerService
(#7933) (#8060)
---
.../apache/iceberg/actions/BaseCommitService.java | 45 +++++--
.../apache/iceberg/actions/TestCommitService.java | 138 +++++++++++++++++++++
2 files changed, 175 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
index 1f36f133d0..82390a25f5 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.Table;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
@@ -49,13 +50,16 @@ import org.slf4j.LoggerFactory;
abstract class BaseCommitService<T> implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseCommitService.class);
+ public static final long TIMEOUT_IN_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(120);
+
private final Table table;
private final ExecutorService committerService;
private final ConcurrentLinkedQueue<T> completedRewrites;
private final ConcurrentLinkedQueue<String> inProgressCommits;
- private final List<T> committedRewrites;
+ private final ConcurrentLinkedQueue<T> committedRewrites;
private final int rewritesPerCommit;
private final AtomicBoolean running = new AtomicBoolean(false);
+ private final long timeoutInMS;
/**
* Constructs a {@link BaseCommitService}
@@ -64,17 +68,30 @@ abstract class BaseCommitService<T> implements Closeable {
* @param rewritesPerCommit number of file groups to include in a commit
*/
BaseCommitService(Table table, int rewritesPerCommit) {
+ this(table, rewritesPerCommit, TIMEOUT_IN_MS_DEFAULT);
+ }
+
+ /**
+ * Constructs a {@link BaseCommitService}
+ *
+ * @param table table to perform commit on
+ * @param rewritesPerCommit number of file groups to include in a commit
+ * @param timeoutInMS The timeout to wait for commits to complete after all
rewrite jobs have been
+ * completed
+ */
+ BaseCommitService(Table table, int rewritesPerCommit, long timeoutInMS) {
this.table = table;
LOG.info(
"Creating commit service for table {} with {} groups per commit",
table, rewritesPerCommit);
this.rewritesPerCommit = rewritesPerCommit;
+ this.timeoutInMS = timeoutInMS;
committerService =
Executors.newSingleThreadExecutor(
new
ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
completedRewrites = Queues.newConcurrentLinkedQueue();
- committedRewrites = Lists.newArrayList();
+ committedRewrites = Queues.newConcurrentLinkedQueue();
inProgressCommits = Queues.newConcurrentLinkedQueue();
}
@@ -138,7 +155,7 @@ abstract class BaseCommitService<T> implements Closeable {
Preconditions.checkState(
committerService.isShutdown(),
"Cannot get results from a service which has not been closed");
- return committedRewrites;
+ return Lists.newArrayList(committedRewrites.iterator());
}
@Override
@@ -154,11 +171,13 @@ abstract class BaseCommitService<T> implements Closeable {
// the commit pool to finish doing its commits to Iceberg State. In the
case of partial
// progress this should have been occurring simultaneously with
rewrites, if not there should
// be only a single commit operation.
- if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
+ if (!committerService.awaitTermination(timeoutInMS,
TimeUnit.MILLISECONDS)) {
LOG.warn(
- "Commit operation did not complete within 120 minutes of the all
files "
+ "Commit operation did not complete within {} minutes ({} ms) of
the all files "
+ "being rewritten. This may mean that some changes were not
successfully committed to the "
- + "table.");
+ + "table.",
+ TimeUnit.MILLISECONDS.toMinutes(timeoutInMS),
+ timeoutInMS);
timeout = true;
}
} catch (InterruptedException e) {
@@ -169,7 +188,11 @@ abstract class BaseCommitService<T> implements Closeable {
if (!completedRewrites.isEmpty() && timeout) {
LOG.error("Attempting to cleanup uncommitted file groups");
- completedRewrites.forEach(this::abortFileGroup);
+ synchronized (completedRewrites) {
+ while (!completedRewrites.isEmpty()) {
+ abortFileGroup(completedRewrites.poll());
+ }
+ }
}
Preconditions.checkArgument(
@@ -211,11 +234,17 @@ abstract class BaseCommitService<T> implements Closeable {
}
}
- private boolean canCreateCommitGroup() {
+ @VisibleForTesting
+ boolean canCreateCommitGroup() {
// Either we have a full commit group, or we have completed writing and
need to commit
// what is left over
boolean fullCommitGroup = completedRewrites.size() >= rewritesPerCommit;
boolean writingComplete = !running.get() && completedRewrites.size() > 0;
return fullCommitGroup || writingComplete;
}
+
+ @VisibleForTesting
+ boolean completedRewritesAllCommitted() {
+ return completedRewrites.isEmpty() && inProgressCommits.isEmpty();
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
new file mode 100644
index 0000000000..1aae648333
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+ public TestCommitService() {
+ super(1);
+ }
+
+ @Test
+ public void testCommittedResultsCorrectly() {
+ CustomCommitService commitService = new CustomCommitService(table, 5,
10000);
+ commitService.start();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ int numberOfFileGroups = 100;
+
Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+ commitService.close();
+
+ Set<Integer> expected = Sets.newHashSet(IntStream.range(0,
100).iterator());
+ Set<Integer> actual = Sets.newHashSet(commitService.results());
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAbortFileGroupsAfterTimeout() {
+ CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+ commitService.start();
+
+ // Add file groups [0-3] for commit.
+ // There are less than the rewritesPerCommit, and thus will not trigger a
commit action. Those
+ // file groups will be added to the completedRewrites queue.
+ // Now the queue has 4 file groups that need to commit.
+ for (int i = 0; i < 4; i++) {
+ commitService.offer(i);
+ }
+
+ // Add file groups [4-7] for commit
+ // These are gated to not be able to commit, so all those 4 file groups
will be added to the
+ // queue as well.
+ // Now the queue has 8 file groups that need to commit.
+ CustomCommitService spyCommitService = spy(commitService);
+ doReturn(false).when(spyCommitService).canCreateCommitGroup();
+ for (int i = 4; i < 8; i++) {
+ spyCommitService.offer(i);
+ }
+
+ // close commitService.
+ // This allows committerService thread to start to commit the remaining
file groups [0-7] in the
+ // completedRewrites queue. And also the main thread waits for the
committerService thread to
+ // finish within a timeout.
+
+ // The committerService thread commits file groups [0-4]. These will wait
a fixed duration to
+ // simulate timeout on the main thread, which then tries to abort file
groups [5-7].
+ // This tests the race conditions, as the committerService is also trying
to commit groups
+ // [5-7].
+ Assertions.assertThatThrownBy(commitService::close)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Timeout occurred when waiting for commits");
+
+ // Wait for the commitService to finish. Committed all file groups or
aborted remaining file
+ // groups.
+ Awaitility.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInSameThread()
+ .untilAsserted(() ->
assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+ if (commitService.aborted.isEmpty()) {
+ // All file groups are committed
+ Assertions.assertThat(commitService.results())
+ .isEqualTo(ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7));
+ } else {
+ // File groups [5-7] are aborted
+ Assertions.assertThat(commitService.results())
+ .doesNotContainAnyElementsOf(commitService.aborted);
+
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1,
2, 3, 4));
+
Assertions.assertThat(commitService.aborted).isEqualTo(ImmutableSet.of(5, 6,
7));
+ }
+ }
+
+ private static class CustomCommitService extends BaseCommitService<Integer> {
+ private final Set<Integer> aborted = Sets.newConcurrentHashSet();
+
+ CustomCommitService(Table table, int rewritesPerCommit, int
timeoutInSeconds) {
+ super(table, rewritesPerCommit, timeoutInSeconds);
+ }
+
+ @Override
+ protected void commitOrClean(Set<Integer> batch) {
+ try {
+ // Slightly longer than timeout
+ Thread.sleep(210);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void abortFileGroup(Integer group) {
+ aborted.add(group);
+ }
+ }
+}