This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 847b4f9980 Core: Abort file groups should be under same lock as 
committerService (#7933) (#8060)
847b4f9980 is described below

commit 847b4f99808823b36c8743eae72b5d4b7301a6a7
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Jul 15 02:19:55 2023 +0800

    Core: Abort file groups should be under same lock as committerService 
(#7933) (#8060)
---
 .../apache/iceberg/actions/BaseCommitService.java  |  45 +++++--
 .../apache/iceberg/actions/TestCommitService.java  | 138 +++++++++++++++++++++
 2 files changed, 175 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java 
b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
index 1f36f133d0..82390a25f5 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.iceberg.Table;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Queues;
@@ -49,13 +50,16 @@ import org.slf4j.LoggerFactory;
 abstract class BaseCommitService<T> implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseCommitService.class);
 
+  public static final long TIMEOUT_IN_MS_DEFAULT = 
TimeUnit.MINUTES.toMillis(120);
+
   private final Table table;
   private final ExecutorService committerService;
   private final ConcurrentLinkedQueue<T> completedRewrites;
   private final ConcurrentLinkedQueue<String> inProgressCommits;
-  private final List<T> committedRewrites;
+  private final ConcurrentLinkedQueue<T> committedRewrites;
   private final int rewritesPerCommit;
   private final AtomicBoolean running = new AtomicBoolean(false);
+  private final long timeoutInMS;
 
   /**
    * Constructs a {@link BaseCommitService}
@@ -64,17 +68,30 @@ abstract class BaseCommitService<T> implements Closeable {
    * @param rewritesPerCommit number of file groups to include in a commit
    */
   BaseCommitService(Table table, int rewritesPerCommit) {
+    this(table, rewritesPerCommit, TIMEOUT_IN_MS_DEFAULT);
+  }
+
+  /**
+   * Constructs a {@link BaseCommitService}
+   *
+   * @param table table to perform commit on
+   * @param rewritesPerCommit number of file groups to include in a commit
+   * @param timeoutInMS The timeout to wait for commits to complete after all 
rewrite jobs have been
+   *     completed
+   */
+  BaseCommitService(Table table, int rewritesPerCommit, long timeoutInMS) {
     this.table = table;
     LOG.info(
         "Creating commit service for table {} with {} groups per commit", 
table, rewritesPerCommit);
     this.rewritesPerCommit = rewritesPerCommit;
+    this.timeoutInMS = timeoutInMS;
 
     committerService =
         Executors.newSingleThreadExecutor(
             new 
ThreadFactoryBuilder().setNameFormat("Committer-Service").build());
 
     completedRewrites = Queues.newConcurrentLinkedQueue();
-    committedRewrites = Lists.newArrayList();
+    committedRewrites = Queues.newConcurrentLinkedQueue();
     inProgressCommits = Queues.newConcurrentLinkedQueue();
   }
 
@@ -138,7 +155,7 @@ abstract class BaseCommitService<T> implements Closeable {
     Preconditions.checkState(
         committerService.isShutdown(),
         "Cannot get results from a service which has not been closed");
-    return committedRewrites;
+    return Lists.newArrayList(committedRewrites.iterator());
   }
 
   @Override
@@ -154,11 +171,13 @@ abstract class BaseCommitService<T> implements Closeable {
       // the commit pool to finish doing its commits to Iceberg State. In the 
case of partial
       // progress this should have been occurring simultaneously with 
rewrites, if not there should
       // be only a single commit operation.
-      if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
+      if (!committerService.awaitTermination(timeoutInMS, 
TimeUnit.MILLISECONDS)) {
         LOG.warn(
-            "Commit operation did not complete within 120 minutes of the all 
files "
+            "Commit operation did not complete within {} minutes ({} ms) of 
the all files "
                 + "being rewritten. This may mean that some changes were not 
successfully committed to the "
-                + "table.");
+                + "table.",
+            TimeUnit.MILLISECONDS.toMinutes(timeoutInMS),
+            timeoutInMS);
         timeout = true;
       }
     } catch (InterruptedException e) {
@@ -169,7 +188,11 @@ abstract class BaseCommitService<T> implements Closeable {
 
     if (!completedRewrites.isEmpty() && timeout) {
       LOG.error("Attempting to cleanup uncommitted file groups");
-      completedRewrites.forEach(this::abortFileGroup);
+      synchronized (completedRewrites) {
+        while (!completedRewrites.isEmpty()) {
+          abortFileGroup(completedRewrites.poll());
+        }
+      }
     }
 
     Preconditions.checkArgument(
@@ -211,11 +234,17 @@ abstract class BaseCommitService<T> implements Closeable {
     }
   }
 
-  private boolean canCreateCommitGroup() {
+  @VisibleForTesting
+  boolean canCreateCommitGroup() {
     // Either we have a full commit group, or we have completed writing and 
need to commit
     // what is left over
     boolean fullCommitGroup = completedRewrites.size() >= rewritesPerCommit;
     boolean writingComplete = !running.get() && completedRewrites.size() > 0;
     return fullCommitGroup || writingComplete;
   }
+
+  @VisibleForTesting
+  boolean completedRewritesAllCommitted() {
+    return completedRewrites.isEmpty() && inProgressCommits.isEmpty();
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java 
b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
new file mode 100644
index 0000000000..1aae648333
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Tasks;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+public class TestCommitService extends TableTestBase {
+
+  public TestCommitService() {
+    super(1);
+  }
+
+  @Test
+  public void testCommittedResultsCorrectly() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 
10000);
+    commitService.start();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    int numberOfFileGroups = 100;
+    
Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
+    commitService.close();
+
+    Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 
100).iterator());
+    Set<Integer> actual = Sets.newHashSet(commitService.results());
+    Assertions.assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testAbortFileGroupsAfterTimeout() {
+    CustomCommitService commitService = new CustomCommitService(table, 5, 200);
+    commitService.start();
+
+    // Add file groups [0-3] for commit.
+    // There are less than the rewritesPerCommit, and thus will not trigger a 
commit action. Those
+    // file groups will be added to the completedRewrites queue.
+    // Now the queue has 4 file groups that need to commit.
+    for (int i = 0; i < 4; i++) {
+      commitService.offer(i);
+    }
+
+    // Add file groups [4-7] for commit
+    // These are gated to not be able to commit, so all those 4 file groups 
will be added to the
+    // queue as well.
+    // Now the queue has 8 file groups that need to commit.
+    CustomCommitService spyCommitService = spy(commitService);
+    doReturn(false).when(spyCommitService).canCreateCommitGroup();
+    for (int i = 4; i < 8; i++) {
+      spyCommitService.offer(i);
+    }
+
+    // close commitService.
+    // This allows committerService thread to start to commit the remaining 
file groups [0-7] in the
+    // completedRewrites queue. And also the main thread waits for the 
committerService thread to
+    // finish within a timeout.
+
+    // The committerService thread commits file groups [0-4]. These will wait 
a fixed duration to
+    // simulate timeout on the main thread, which then tries to abort file 
groups [5-7].
+    // This tests the race conditions, as the committerService is also trying 
to commit groups
+    // [5-7].
+    Assertions.assertThatThrownBy(commitService::close)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Timeout occurred when waiting for commits");
+
+    // Wait for the commitService to finish. Committed all file groups or 
aborted remaining file
+    // groups.
+    Awaitility.await()
+        .atMost(5, TimeUnit.SECONDS)
+        .pollInSameThread()
+        .untilAsserted(() -> 
assertThat(commitService.completedRewritesAllCommitted()).isTrue());
+    if (commitService.aborted.isEmpty()) {
+      // All file groups are committed
+      Assertions.assertThat(commitService.results())
+          .isEqualTo(ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7));
+    } else {
+      // File groups [5-7] are aborted
+      Assertions.assertThat(commitService.results())
+          .doesNotContainAnyElementsOf(commitService.aborted);
+      
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 
2, 3, 4));
+      
Assertions.assertThat(commitService.aborted).isEqualTo(ImmutableSet.of(5, 6, 
7));
+    }
+  }
+
+  private static class CustomCommitService extends BaseCommitService<Integer> {
+    private final Set<Integer> aborted = Sets.newConcurrentHashSet();
+
+    CustomCommitService(Table table, int rewritesPerCommit, int 
timeoutInSeconds) {
+      super(table, rewritesPerCommit, timeoutInSeconds);
+    }
+
+    @Override
+    protected void commitOrClean(Set<Integer> batch) {
+      try {
+        // Slightly longer than timeout
+        Thread.sleep(210);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    protected void abortFileGroup(Integer group) {
+      aborted.add(group);
+    }
+  }
+}

Reply via email to