sanha closed pull request #67: [NEMO-54] Handle remote data fetch failures due
to executor removal
URL: https://github.com/apache/incubator-nemo/pull/67
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
index a2fcd5eca..e57de5ba3 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
@@ -139,12 +139,13 @@ void onContextClose() {
@Override
public void onChannelError(@Nullable final Throwable cause) {
setChannelError(cause);
- if (cause == null) {
- completedFuture.cancel(false);
- } else {
- completedFuture.completeExceptionally(cause);
+
+ if (currentByteBufInputStream != null) {
+ currentByteBufInputStream.byteBufQueue.closeExceptionally(cause);
}
- onContextClose();
+ byteBufInputStreams.closeExceptionally(cause);
+ completedFuture.completeExceptionally(cause);
+ deregister();
}
/**
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
index af718c3f6..872b275a2 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
@@ -30,6 +30,7 @@
private final Queue<T> queue;
private volatile boolean closed = false;
+ private volatile Throwable throwable = null;
/**
* Creates a closable blocking queue.
@@ -74,6 +75,14 @@ public synchronized void close() {
notifyAll();
}
+ /**
+ * Mark the input end of this queue as closed.
+ */
+ public synchronized void closeExceptionally(final Throwable throwableToSet) {
+ this.throwable = throwableToSet;
+ close();
+ }
+
/**
* Retrieves and removes the head of this queue, waiting if necessary.
*
@@ -82,6 +91,10 @@ public synchronized void close() {
*/
@Nullable
public synchronized T take() throws InterruptedException {
+ if (throwable != null) {
+ throw new RuntimeException(throwable);
+ }
+
while (queue.isEmpty() && !closed) {
wait();
}
@@ -97,6 +110,10 @@ public synchronized T take() throws InterruptedException {
*/
@Nullable
public synchronized T peek() throws InterruptedException {
+ if (throwable != null) {
+ throw new RuntimeException(throwable);
+ }
+
while (queue.isEmpty() && !closed) {
wait();
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 144ded08d..c3d41f72c 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -469,6 +469,7 @@ public void run() {
* @param inputContext {@link ByteInputContext}
*/
public void onInputContext(final ByteInputContext inputContext) {
+ throw new IllegalStateException("No logic here");
}
/**
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 53c03f117..1f4276add 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.executor.task;
-import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -37,7 +36,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(ParentTaskDataFetcher.class);
private final InputReader readersForParentTask;
- private final LinkedBlockingQueue<DataUtil.IteratorWithNumBytes> dataQueue;
+ private final LinkedBlockingQueue iteratorQueue;
// Non-finals (lazy fetching)
private boolean hasFetchStarted;
@@ -54,7 +53,8 @@
super(dataSource, child, metricMap,
readerForParentTask.isSideInputReader(), isToSideInput);
this.readersForParentTask = readerForParentTask;
this.hasFetchStarted = false;
- this.dataQueue = new LinkedBlockingQueue<>();
+ this.currentIteratorIndex = 0;
+ this.iteratorQueue = new LinkedBlockingQueue<>();
}
private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
@@ -88,15 +88,15 @@ private void fetchInBackground() {
this.expectedNumOfIterators = futures.size();
futures.forEach(compFuture -> compFuture.whenComplete((iterator,
exception) -> {
- if (exception != null) {
- throw new BlockFetchException(exception);
- }
-
try {
- dataQueue.put(iterator); // can block here
+ if (exception != null) {
+ iteratorQueue.put(exception); // can block here
+ } else {
+ iteratorQueue.put(iterator); // can block here
+ }
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
- throw new BlockFetchException(e);
+ throw new RuntimeException(e); // This shouldn't happen
}
}));
}
@@ -106,18 +106,16 @@ Object fetchDataElement() throws IOException {
try {
if (!hasFetchStarted) {
fetchInBackground();
- hasFetchStarted = true;
- this.currentIterator = dataQueue.take();
- this.currentIteratorIndex = 1;
+ advanceIterator();
}
if (this.currentIterator.hasNext()) {
+ // This iterator has an element available
noElementAtAll = false;
return this.currentIterator.next();
} else {
- // This iterator is done, proceed to the next iterator
if (currentIteratorIndex == expectedNumOfIterators) {
- // No more iterator left
+ // Entire fetcher is done
if (noElementAtAll) {
// This shouldn't normally happen, except for cases such as when
Beam's VoidCoder is used.
noElementAtAll = false;
@@ -127,16 +125,39 @@ Object fetchDataElement() throws IOException {
return null;
}
} else {
+ // Advance to the next one
handleMetric(currentIterator);
- // Try the next iterator
- this.currentIteratorIndex += 1;
- this.currentIterator = dataQueue.take();
+ advanceIterator();
return fetchDataElement();
}
}
- } catch (InterruptedException exception) {
- Thread.currentThread().interrupt();
- throw new IOException(exception);
+ } catch (final Throwable e) {
+ // Any failure is caught and thrown as an IOException, so that the task
is retried.
+ // In particular, we catch unchecked exceptions like RuntimeException
thrown by DataUtil.IteratorWithNumBytes
+ // when remote data fetching fails for whatever reason.
+ // Note that we rely on unchecked exceptions because the Iterator
interface does not provide the standard
+ // "throw Exception" that the TaskExecutor thread can catch and handle.
+ throw new IOException(e);
+ }
+ }
+
+ private void advanceIterator() throws Throwable {
+ // Take from iteratorQueue
+ final Object iteratorOrThrowable;
+ try {
+ iteratorOrThrowable = iteratorQueue.take();
+ } catch (InterruptedException e) {
+ throw e;
+ }
+
+ // Handle iteratorOrThrowable
+ if (iteratorOrThrowable instanceof Throwable) {
+ throw (Throwable) iteratorOrThrowable;
+ } else {
+ // This iterator is valid. Do advance.
+ hasFetchStarted = true;
+ this.currentIterator = (DataUtil.IteratorWithNumBytes)
iteratorOrThrowable;
+ this.currentIteratorIndex++;
}
}
}
diff --git
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
new file mode 100644
index 000000000..e2b3826d3
--- /dev/null
+++
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.task;
+
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.executor.data.DataUtil;
+import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link ParentTaskDataFetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({InputReader.class, VertexHarness.class})
+public final class ParentTaskDataFetcherTest {
+
+ @Test(timeout=5000)
+ public void testEmpty() throws Exception {
+ // InputReader
+ final List<String> dataElements = new ArrayList<>(0); // empty data
+ final InputReader inputReader =
generateInputReader(generateCompletableFuture(dataElements.iterator()));
+
+ // Fetcher
+ final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+ // Should return Void.TYPE
+ assertEquals(Void.TYPE, fetcher.fetchDataElement());
+ }
+
+ @Test(timeout=5000)
+ public void testNonEmpty() throws Exception {
+ // InputReader
+ final String singleData = "Single";
+ final List<String> dataElements = new ArrayList<>(1);
+ dataElements.add(singleData); // Single element
+ final InputReader inputReader =
generateInputReader(generateCompletableFuture(dataElements.iterator()));
+
+ // Fetcher
+ final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+ // Should return only a single element
+ assertEquals(singleData, fetcher.fetchDataElement());
+ assertEquals(null, fetcher.fetchDataElement());
+ }
+
+ @Test(timeout=5000, expected = IOException.class)
+ public void testErrorWhenRPC() throws Exception {
+ // Failing future
+ final CompletableFuture failingFuture = CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(2 * 1000); // Block the fetcher for 2 seconds
+ throw new RuntimeException(); // Fail this future
+ } catch (InterruptedException e) {
+ // This shouldn't happen.
+ // We don't throw anything here, so that IOException does not occur
and the test fails
+ }
+ }, Executors.newSingleThreadExecutor());
+ final InputReader inputReader = generateInputReader(failingFuture);
+
+ // Fetcher
+ final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+ // Should throw an IOException
+ fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
+ assertTrue(failingFuture.isCompletedExceptionally());
+ }
+
+ @Test(timeout=5000, expected = IOException.class)
+ public void testErrorWhenReadingData() throws Exception {
+ // Failed iterator
+ final InputReader inputReader =
generateInputReader(generateCompletableFuture(new FailedIterator()));
+
+ // Fetcher
+ final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+
+ // Should throw an IOException
+ fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
+ }
+
+ private ParentTaskDataFetcher createFetcher(final InputReader
readerForParentTask) {
+ return new ParentTaskDataFetcher(
+ mock(IRVertex.class),
+ readerForParentTask, // This is the only argument that affects the
behavior of ParentTaskDataFetcher
+ mock(VertexHarness.class),
+ new HashMap<>(0),
+ false);
+ }
+
+ private InputReader generateInputReader(final CompletableFuture
completableFuture) {
+ final InputReader inputReader = mock(InputReader.class);
+ when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
+ return inputReader;
+ }
+
+ private CompletableFuture generateCompletableFuture(final Iterator iterator)
{
+ return
CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(iterator));
+ }
+
+ private class FailedIterator implements Iterator {
+ @Override
+ public boolean hasNext() {
+ throw new RuntimeException("Fail");
+ }
+
+ @Override
+ public Object next() {
+ throw new RuntimeException("Fail");
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services