This is an automated email from the ASF dual-hosted git repository.

findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b17d1c9abd Core: Remove reflection from TestParallelIterable (#10857)
b17d1c9abd is described below

commit b17d1c9abdb8fbd668ac02194cadd6003c3e37f7
Author: Piotr Findeisen <[email protected]>
AuthorDate: Fri Aug 2 20:44:48 2024 +0200

    Core: Remove reflection from TestParallelIterable (#10857)
    
    This is a unit test, so can leverage package-private access.
---
 .../org/apache/iceberg/util/ParallelIterable.java  |  9 ++++-
 .../apache/iceberg/util/TestParallelIterable.java  | 41 +++++++++-------------
 2 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java 
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 6486bd7fd4..16fa6f3d85 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.io.Closer;
@@ -77,7 +78,8 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
     return iter;
   }
 
-  private static class ParallelIterator<T> implements CloseableIterator<T> {
+  @VisibleForTesting
+  static class ParallelIterator<T> implements CloseableIterator<T> {
     private final Iterator<Task<T>> tasks;
     private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
     private final ExecutorService workerPool;
@@ -229,6 +231,11 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
       }
       return queue.poll();
     }
+
+    @VisibleForTesting
+    int queueSize() {
+      return queue.size();
+    }
   }
 
   private static class Task<T> implements Supplier<Optional<Task<T>>>, 
Closeable {
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java 
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 4910732f6e..c259bbd0a7 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -21,12 +21,9 @@ package org.apache.iceberg.util;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +37,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
+import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
@@ -64,20 +62,17 @@ public class TestParallelIterable {
                 });
 
     ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) 
queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
 
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
     Awaitility.await("Queue is populated")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator, queue));
+        .untilAsserted(() -> queueHasElements(iterator));
     iterator.close();
     Awaitility.await("Queue is cleared")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> assertThat(queue).isEmpty());
+        .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
   }
 
   @Test
@@ -124,20 +119,21 @@ public class TestParallelIterable {
                 });
 
     ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) 
queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
 
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
     Awaitility.await("Queue is populated")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator, queue));
+        .untilAsserted(() -> queueHasElements(iterator));
     iterator.close();
     Awaitility.await("Queue is cleared")
         .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> assertThat(queue).as("Queue is not empty after 
cleaning").isEmpty());
+        .untilAsserted(
+            () ->
+                assertThat(iterator.queueSize())
+                    .as("Queue is not empty after cleaning")
+                    .isEqualTo(0));
   }
 
   @Test
@@ -159,17 +155,14 @@ public class TestParallelIterable {
     ExecutorService executor = Executors.newCachedThreadPool();
     ParallelIterable<Integer> parallelIterable =
         new ParallelIterable<>(iterables, executor, maxQueueSize);
-    CloseableIterator<Integer> iterator = parallelIterable.iterator();
-    Field queueField = iterator.getClass().getDeclaredField("queue");
-    queueField.setAccessible(true);
-    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) 
queueField.get(iterator);
+    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
 
     Multiset<Integer> actualValues = HashMultiset.create();
 
     while (iterator.hasNext()) {
-      assertThat(queue)
-          .as("iterator internal queue")
-          .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size());
+      assertThat(iterator.queueSize())
+          .as("iterator internal queue size")
+          .isLessThanOrEqualTo(maxQueueSize + iterables.size());
       actualValues.add(iterator.next());
     }
 
@@ -181,9 +174,9 @@ public class TestParallelIterable {
     executor.shutdownNow();
   }
 
-  private void queueHasElements(CloseableIterator<Integer> iterator, Queue 
queue) {
+  private void queueHasElements(ParallelIterator<Integer> iterator) {
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();
-    assertThat(queue).isNotEmpty();
+    assertThat(iterator.queueSize()).as("queue size").isGreaterThan(0);
   }
 }

Reply via email to