This is an automated email from the ASF dual-hosted git repository.
findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b17d1c9abd Core: Remove reflection from TestParallelIterable (#10857)
b17d1c9abd is described below
commit b17d1c9abdb8fbd668ac02194cadd6003c3e37f7
Author: Piotr Findeisen <[email protected]>
AuthorDate: Fri Aug 2 20:44:48 2024 +0200
Core: Remove reflection from TestParallelIterable (#10857)
This is a unit test, so can leverage package-private access.
---
.../org/apache/iceberg/util/ParallelIterable.java | 9 ++++-
.../apache/iceberg/util/TestParallelIterable.java | 41 +++++++++-------------
2 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 6486bd7fd4..16fa6f3d85 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
@@ -77,7 +78,8 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
return iter;
}
- private static class ParallelIterator<T> implements CloseableIterator<T> {
+ @VisibleForTesting
+ static class ParallelIterator<T> implements CloseableIterator<T> {
private final Iterator<Task<T>> tasks;
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
private final ExecutorService workerPool;
@@ -229,6 +231,11 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
}
return queue.poll();
}
+
+ @VisibleForTesting
+ int queueSize() {
+ return queue.size();
+ }
}
private static class Task<T> implements Supplier<Optional<Task<T>>>,
Closeable {
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 4910732f6e..c259bbd0a7 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -21,12 +21,9 @@ package org.apache.iceberg.util;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -40,6 +37,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
+import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
@@ -64,20 +62,17 @@ public class TestParallelIterable {
});
ParallelIterable<Integer> parallelIterable = new
ParallelIterable<>(transform, executor);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>)
queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>)
parallelIterable.iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Awaitility.await("Queue is populated")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> queueHasElements(iterator, queue));
+ .untilAsserted(() -> queueHasElements(iterator));
iterator.close();
Awaitility.await("Queue is cleared")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertThat(queue).isEmpty());
+ .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
}
@Test
@@ -124,20 +119,21 @@ public class TestParallelIterable {
});
ParallelIterable<Integer> parallelIterable = new
ParallelIterable<>(transform, executor);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>)
queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>)
parallelIterable.iterator();
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Awaitility.await("Queue is populated")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> queueHasElements(iterator, queue));
+ .untilAsserted(() -> queueHasElements(iterator));
iterator.close();
Awaitility.await("Queue is cleared")
.atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertThat(queue).as("Queue is not empty after
cleaning").isEmpty());
+ .untilAsserted(
+ () ->
+ assertThat(iterator.queueSize())
+ .as("Queue is not empty after cleaning")
+ .isEqualTo(0));
}
@Test
@@ -159,17 +155,14 @@ public class TestParallelIterable {
ExecutorService executor = Executors.newCachedThreadPool();
ParallelIterable<Integer> parallelIterable =
new ParallelIterable<>(iterables, executor, maxQueueSize);
- CloseableIterator<Integer> iterator = parallelIterable.iterator();
- Field queueField = iterator.getClass().getDeclaredField("queue");
- queueField.setAccessible(true);
- ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>)
queueField.get(iterator);
+ ParallelIterator<Integer> iterator = (ParallelIterator<Integer>)
parallelIterable.iterator();
Multiset<Integer> actualValues = HashMultiset.create();
while (iterator.hasNext()) {
- assertThat(queue)
- .as("iterator internal queue")
- .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size());
+ assertThat(iterator.queueSize())
+ .as("iterator internal queue size")
+ .isLessThanOrEqualTo(maxQueueSize + iterables.size());
actualValues.add(iterator.next());
}
@@ -181,9 +174,9 @@ public class TestParallelIterable {
executor.shutdownNow();
}
- private void queueHasElements(CloseableIterator<Integer> iterator, Queue
queue) {
+ private void queueHasElements(ParallelIterator<Integer> iterator) {
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
- assertThat(queue).isNotEmpty();
+ assertThat(iterator.queueSize()).as("queue size").isGreaterThan(0);
}
}