amogh-jahagirdar commented on code in PR #16108:
URL: https://github.com/apache/iceberg/pull/16108#discussion_r3326982011


##########
core/src/main/java/org/apache/iceberg/SnapshotProducer.java:
##########
@@ -227,6 +239,14 @@ protected ExecutorService workerPool() {
     return workerPool;
   }
 
+  protected ExecutorService writePool() {
+    if (writePool == null) {
+      this.writePool = ThreadPools.getWorkerPool();
+    }
+
+    return writePool;

Review Comment:
   Should writePool by default just be set to ThreadPools.getWorkerPool() at 
the field level, then just return writePool here?



##########
api/src/main/java/org/apache/iceberg/SnapshotUpdate.java:
##########
@@ -60,6 +60,22 @@ public interface SnapshotUpdate<ThisT> extends 
PendingUpdate<Snapshot> {
    */
   ThisT scanManifestsWith(ExecutorService executorService);
 
+  /**
+   * Use a particular executor to write manifests during commit with the 
specified parallelism. The
+   * default worker pool will be used by default.
+   *
+   * <p>The parallelism parameter controls how many manifest writers are used, 
which determines the
+   * number of manifest files produced. The executor provides the threads for 
parallel execution.

Review Comment:
   which "informs" or "helps control" rather than "determines"? since it's not 
always 1:1, there's internal control on making sure there's not too many small 
manifests



##########
core/src/test/java/org/apache/iceberg/TestMergeAppend.java:
##########
@@ -381,6 +376,83 @@ public void testAppendWithManifestScanExecutor() {
     assertThat(snapshot).isNotNull();
   }
 
+  @TestTemplate
+  public void testAppendWithWriteManifestsExecutor() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);
+    AtomicInteger commitThreadsIndex = new AtomicInteger(0);
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .writeManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("commit-" + 
commitThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }),
+                    1),
+            branch);
+    assertThat(commitThreadsIndex.get())
+        .as("Thread should be created in provided commit pool")
+        .isGreaterThan(0);
+    assertThat(snapshot).isNotNull();
+  }
+
+  @TestTemplate
+  public void testAppendWithSeparateScanAndWriteExecutors() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);
+    AtomicInteger scanThreadsIndex = new AtomicInteger(0);
+    AtomicInteger commitThreadsIndex = new AtomicInteger(0);
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .scanManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("scan-" + 
scanThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }))
+                .writeManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("commit-" + 
commitThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }),
+                    1),
+            branch);
+    assertThat(scanThreadsIndex.get())
+        .as("Thread should be created in provided scan pool")
+        .isGreaterThan(0);
+    assertThat(commitThreadsIndex.get())
+        .as("Thread should be created in provided commit pool")
+        .isGreaterThan(0);
+    assertThat(snapshot).isNotNull();

Review Comment:
   I know the test was added largely just to validate the setting of the pool 
but I feel like we should assert more of the table state after the append just 
like the other tests



##########
core/src/test/java/org/apache/iceberg/TestMergeAppend.java:
##########
@@ -381,6 +376,83 @@ public void testAppendWithManifestScanExecutor() {
     assertThat(snapshot).isNotNull();
   }
 
+  @TestTemplate
+  public void testAppendWithWriteManifestsExecutor() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);
+    AtomicInteger commitThreadsIndex = new AtomicInteger(0);
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .writeManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("commit-" + 
commitThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }),
+                    1),
+            branch);
+    assertThat(commitThreadsIndex.get())
+        .as("Thread should be created in provided commit pool")
+        .isGreaterThan(0);
+    assertThat(snapshot).isNotNull();
+  }
+
+  @TestTemplate
+  public void testAppendWithSeparateScanAndWriteExecutors() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);

Review Comment:
   Not sure we really need this but if we want to keep it I'd recommend a 
helper like `assertEmptyTable()` or something like that



##########
core/src/test/java/org/apache/iceberg/TestMergeAppend.java:
##########
@@ -381,6 +376,83 @@ public void testAppendWithManifestScanExecutor() {
     assertThat(snapshot).isNotNull();
   }
 
+  @TestTemplate
+  public void testAppendWithWriteManifestsExecutor() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);
+    AtomicInteger commitThreadsIndex = new AtomicInteger(0);
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .writeManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("commit-" + 
commitThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }),
+                    1),
+            branch);
+    assertThat(commitThreadsIndex.get())
+        .as("Thread should be created in provided commit pool")
+        .isGreaterThan(0);
+    assertThat(snapshot).isNotNull();
+  }
+
+  @TestTemplate
+  public void testAppendWithSeparateScanAndWriteExecutors() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    TableMetadata base = readMetadata();
+    assertThat(base.currentSnapshot()).isNull();
+    assertThat(base.lastSequenceNumber()).isEqualTo(0);
+    AtomicInteger scanThreadsIndex = new AtomicInteger(0);
+    AtomicInteger commitThreadsIndex = new AtomicInteger(0);
+    Snapshot snapshot =
+        commit(
+            table,
+            table
+                .newAppend()
+                .appendFile(FILE_A)
+                .appendFile(FILE_B)
+                .scanManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("scan-" + 
scanThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }))
+                .writeManifestsWith(
+                    Executors.newFixedThreadPool(
+                        1,
+                        runnable -> {
+                          Thread thread = new Thread(runnable);
+                          thread.setName("commit-" + 
commitThreadsIndex.getAndIncrement());
+                          thread.setDaemon(true);
+                          return thread;
+                        }),

Review Comment:
   Maybe a separate `newNamedExecutor ` helper , makes it less dense to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to