This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4edf10c throw failure with iteratorexecutor when allocating files
(#3321)
4edf10c is described below
commit 4edf10c3e112c213aa3dcb3299470eadd1cd61ce
Author: William Lo <[email protected]>
AuthorDate: Mon Jul 12 17:36:48 2021 -0700
throw failure with iteratorexecutor when allocating files (#3321)
---
.../PriorityIterableBasedRequestAllocator.java | 3 ++-
.../BruteForceAllocatorTest.java | 23 ++++++++++++++++++++++
.../request_allocation/GreedyAllocatorTest.java | 22 +++++++++++++++++++++
.../request_allocation/PreOrderAllocatorTest.java | 23 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
index f5be73e..be58e6d 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/request_allocation/PriorityIterableBasedRequestAllocator.java
@@ -91,7 +91,8 @@ public abstract class PriorityIterableBasedRequestAllocator<T
extends Request<T>
try {
List<Either<Void, ExecutionException>> results =
executor.executeAndGetResults();
- IteratorExecutor.logFailures(results, log, 10);
+ // Throw runtime failure if an exception occurs during execution to
fail the job
+ IteratorExecutor.logAndThrowFailures(results, log, 10);
} catch (InterruptedException ie) {
log.error("Request allocation was interrupted.");
return new AllocatedRequestsIteratorBase<>(
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
index 1243cd1..f5e7f20 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/BruteForceAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import static org.mockito.Mockito.*;
public class BruteForceAllocatorTest {
@Test
@@ -53,4 +54,26 @@ public class BruteForceAllocatorTest {
Assert.assertEquals(resultList.get(2).getString(), "d-30");
Assert.assertEquals(resultList.get(3).getString(), "e-20");
}
+
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator =
mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new
RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new
StringRequest.StringRequestComparator()).build();
+ BruteForceAllocator<StringRequest> allocator =
+ new BruteForceAllocator<>(configuration);
+
+ ResourcePool pool =
ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests =
Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () ->
allocator.allocateRequests(requests.iterator(), pool));
+ }
}
\ No newline at end of file
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
index c1d52d6..ac293d3 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/GreedyAllocatorTest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import static org.mockito.Mockito.*;
public class GreedyAllocatorTest {
@@ -58,4 +59,25 @@ public class GreedyAllocatorTest {
})), Sets.newHashSet("a-50", "f-50"));
}
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator =
mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new
RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new
StringRequest.StringRequestComparator()).build();
+ GreedyAllocator<StringRequest> allocator = new
GreedyAllocator<>(configuration);
+
+ ResourcePool pool =
ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests =
Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () ->
allocator.allocateRequests(requests.iterator(), pool));
+ }
+
}
\ No newline at end of file
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
index 38d5d99..a27970a 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/request_allocation/PreOrderAllocatorTest.java
@@ -24,6 +24,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import static org.mockito.Mockito.*;
public class PreOrderAllocatorTest {
@@ -61,4 +62,26 @@ public class PreOrderAllocatorTest {
Assert.assertFalse(estimator.getQueriedRequests().contains("f-50"));
}
+ @Test
+ public void testThrowExceptionOnFailure() throws Exception {
+ ResourceEstimator<StringRequest> failingEstimator =
mock(ResourceEstimator.class);
+ when(failingEstimator.estimateRequirement(any(), any())).thenThrow(new
RuntimeException("Error"));
+
+ RequestAllocatorConfig<StringRequest> configuration =
+ RequestAllocatorConfig.builder(failingEstimator)
+ .allowParallelization()
+ .withPrioritizer(new
StringRequest.StringRequestComparator()).build();
+ PreOrderAllocator<StringRequest> allocator =
+ new PreOrderAllocator<>(configuration);
+
+ ResourcePool pool =
ResourcePool.builder().maxResource(StringRequest.MEMORY, 100.).build();
+
+ List<Requestor<StringRequest>> requests =
Lists.<Requestor<StringRequest>>newArrayList(
+ new StringRequestor("r1", "a-50", "f-50", "k-20"),
+ new StringRequestor("r2", "j-10", "b-20", "e-20"),
+ new StringRequestor("r3", "g-20", "c-200", "d-30"));
+
+ Assert.expectThrows(RuntimeException.class, () ->
allocator.allocateRequests(requests.iterator(), pool));
+ }
+
}
\ No newline at end of file