seojangho closed pull request #93: [NEMO-160] Handle Beam VoidCoder properly
URL: https://github.com/apache/incubator-nemo/pull/93
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 32b9352ff..e6433fd74 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -17,11 +17,7 @@
 
 import edu.snu.nemo.common.ir.OutputCollector;
 
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 
 /**
  * OutputCollector implementation.
@@ -29,92 +25,61 @@
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
-  private final Queue<O> mainTagOutputQueue;
-  private final Map<String, Queue<Object>> additionalTagOutputQueues;
-
-  /**
-   * Constructor of a new OutputCollectorImpl.
-   */
-  public OutputCollectorImpl() {
-    this.mainTagOutputQueue = new ArrayDeque<>(1);
-    this.additionalTagOutputQueues = new HashMap<>();
-  }
+  // Use ArrayList (not Queue) to allow 'null' values
+  private final ArrayList<O> mainTagElements;
+  private final Map<String, ArrayList<Object>> additionalTagElementsMap;
 
   /**
    * Constructor of a new OutputCollectorImpl with tagged outputs.
    * @param taggedChildren tagged children
    */
   public OutputCollectorImpl(final List<String> taggedChildren) {
-    this.mainTagOutputQueue = new ArrayDeque<>(1);
-    this.additionalTagOutputQueues = new HashMap<>();
-    taggedChildren.forEach(child -> this.additionalTagOutputQueues.put(child, 
new ArrayDeque<>(1)));
+    this.mainTagElements = new ArrayList<>(1);
+    this.additionalTagElementsMap = new HashMap<>();
+    taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, 
new ArrayList<>(1)));
   }
 
   @Override
   public void emit(final O output) {
-    mainTagOutputQueue.add(output);
+    mainTagElements.add(output);
   }
 
   @Override
   public <T> void emit(final String dstVertexId, final T output) {
-    if (this.additionalTagOutputQueues.get(dstVertexId) == null) {
+    if (this.additionalTagElementsMap.get(dstVertexId) == null) {
       // This dstVertexId is for the main tag
       emit((O) output);
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      this.additionalTagOutputQueues.get(dstVertexId).add(output);
+      this.additionalTagElementsMap.get(dstVertexId).add(output);
     }
   }
 
-  /**
-   * Inter-Task data is transferred from sender-side Task's OutputCollectorImpl
-   * to receiver-side Task.
-   *
-   * @return the first element of this list
-   */
-  public O remove() {
-    return mainTagOutputQueue.remove();
+  public Iterable<O> iterateMain() {
+    return mainTagElements;
   }
 
-  /**
-   * Inter-task data is transferred from sender-side Task's OutputCollectorImpl
-   * to receiver-side Task.
-   *
-   * @param tag output tag
-   * @return the first element of corresponding list
-   */
-  public Object remove(final String tag) {
-    if (this.additionalTagOutputQueues.get(tag) == null) {
+  public Iterable<Object> iterateTag(final String tag) {
+    if (this.additionalTagElementsMap.get(tag) == null) {
       // This dstVertexId is for the main tag
-      return remove();
+      return (Iterable<Object>) iterateMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      return this.additionalTagOutputQueues.get(tag).remove();
+      return this.additionalTagElementsMap.get(tag);
     }
-
   }
 
-  /**
-   * Check if this OutputCollector is empty.
-   *
-   * @return true if this OutputCollector is empty.
-   */
-  public boolean isEmpty() {
-    return mainTagOutputQueue.isEmpty();
+  public void clearMain() {
+    mainTagElements.clear();
   }
 
-  /**
-   * Check if this OutputCollector is empty.
-   *
-   * @param tag output tag
-   * @return true if this OutputCollector is empty.
-   */
-  public boolean isEmpty(final String tag) {
-    if (this.additionalTagOutputQueues.get(tag) == null) {
-      return isEmpty();
+  public void clearTag(final String tag) {
+    if (this.additionalTagElementsMap.get(tag) == null) {
+      // This dstVertexId is for the main tag
+      clearMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      return this.additionalTagOutputQueues.get(tag).isEmpty();
+      this.additionalTagElementsMap.get(tag).clear();
     }
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
index c7aeb0e65..bb80e1abf 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
@@ -40,9 +40,9 @@
 
   /**
    * Can block until the next data element becomes available.
-   *
-   * @return null if there's no more data element.
-   * @throws IOException while fetching data
+   * @return data element
+   * @throws IOException upon I/O error
+   * @throws java.util.NoSuchElementException if no more element is available
    */
   abstract Object fetchDataElement() throws IOException;
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 91c80d0e7..0193bae1e 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,8 +15,6 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -26,7 +24,7 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -41,11 +39,10 @@
   private final LinkedBlockingQueue iteratorQueue;
 
   // Non-finals (lazy fetching)
-  private boolean hasFetchStarted;
+  private boolean firstFetch;
   private int expectedNumOfIterators;
   private DataUtil.IteratorWithNumBytes currentIterator;
   private int currentIteratorIndex;
-  private boolean noElementAtAll = true;
   private long serBytes = 0;
   private long encodedBytes = 0;
 
@@ -55,85 +52,35 @@
                         final boolean isToSideInput) {
     super(dataSource, child, readerForParentTask.isSideInputReader(), 
isToSideInput);
     this.readersForParentTask = readerForParentTask;
-    this.hasFetchStarted = false;
+    this.firstFetch = true;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
   }
 
-  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
-    try {
-      serBytes += iterator.getNumSerializedBytes();
-    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
-      serBytes = -1;
-    } catch (final IllegalStateException e) {
-      LOG.error("Failed to get the number of bytes of serialized data - the 
data is not ready yet ", e);
-    }
-    try {
-      encodedBytes += iterator.getNumEncodedBytes();
-    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
-      encodedBytes = -1;
-    } catch (final IllegalStateException e) {
-      LOG.error("Failed to get the number of bytes of encoded data - the data 
is not ready yet ", e);
-    }
-  }
-
-  /**
-   * Blocking call.
-   */
-  private void fetchInBackground() {
-    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = 
readersForParentTask.read();
-    this.expectedNumOfIterators = futures.size();
-
-    futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
-      try {
-        if (exception != null) {
-          iteratorQueue.put(exception); // can block here
-        } else {
-          iteratorQueue.put(iterator); // can block here
-        }
-      } catch (final InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e); // This shouldn't happen
-      }
-    }));
-  }
-
   @Override
   Object fetchDataElement() throws IOException {
     try {
-      if (!hasFetchStarted) {
-        fetchInBackground();
+      if (firstFetch) {
+        fetchDataLazily();
         advanceIterator();
+        firstFetch = false;
       }
 
-      if (this.currentIterator.hasNext()) {
-        // This iterator has an element available
-        noElementAtAll = false;
-        return this.currentIterator.next();
-      } else {
-        if (currentIteratorIndex == expectedNumOfIterators) {
-          // Entire fetcher is done
-          if (noElementAtAll) {
-            final Optional<DecoderFactory> decoderFactory =
-                
readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
-
-            // TODO #173: Properly handle zero-element task outputs. Currently 
fetchDataElement relies on
-            // toString() method to distinguish whether to return Void.TYPE or 
not.
-            if (decoderFactory.get().toString().equals("VoidCoder")) {
-              noElementAtAll = false;
-              return Void.TYPE;
-            } else {
-              return null;
-            }
-          } else {
-            // This whole fetcher's done
-            return null;
-          }
-        } else {
-          // Advance to the next one
+      while (true) {
+        // This iterator has the element
+        if (this.currentIterator.hasNext()) {
+          return this.currentIterator.next();
+        }
+
+        // This iterator does not have the element
+        if (currentIteratorIndex < expectedNumOfIterators) {
+          // Next iterator has the element
           countBytes(currentIterator);
           advanceIterator();
-          return fetchDataElement();
+          continue;
+        } else {
+          // We've consumed all the iterators
+          break;
         }
       }
     } catch (final Throwable e) {
@@ -144,33 +91,71 @@ Object fetchDataElement() throws IOException {
       // "throw Exception" that the TaskExecutor thread can catch and handle.
       throw new IOException(e);
     }
+
+    // We throw the exception here, outside of the above try-catch region
+    throw new NoSuchElementException();
   }
 
-  private void advanceIterator() throws Throwable {
+  private void advanceIterator() throws IOException {
     // Take from iteratorQueue
     final Object iteratorOrThrowable;
     try {
-      iteratorOrThrowable = iteratorQueue.take();
+      iteratorOrThrowable = iteratorQueue.take(); // blocking call
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
     }
 
     // Handle iteratorOrThrowable
     if (iteratorOrThrowable instanceof Throwable) {
-      throw (Throwable) iteratorOrThrowable;
+      throw new IOException((Throwable) iteratorOrThrowable);
     } else {
       // This iterator is valid. Do advance.
-      hasFetchStarted = true;
       this.currentIterator = (DataUtil.IteratorWithNumBytes) 
iteratorOrThrowable;
       this.currentIteratorIndex++;
     }
   }
 
-  public final long getSerializedBytes() {
+  private void fetchDataLazily() throws IOException {
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = 
readersForParentTask.read();
+    this.expectedNumOfIterators = futures.size();
+
+    futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
+      try {
+        if (exception != null) {
+          iteratorQueue.put(exception);
+        } else {
+          iteratorQueue.put(iterator);
+        }
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e); // this should not happen
+      }
+    }));
+  }
+
+  final long getSerializedBytes() {
     return serBytes;
   }
 
-  public final long getEncodedBytes() {
+  final long getEncodedBytes() {
     return encodedBytes;
   }
+
+  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
+    try {
+      serBytes += iterator.getNumSerializedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
+      serBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of serialized data - the 
data is not ready yet ", e);
+    }
+    try {
+      encodedBytes += iterator.getNumEncodedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
+      encodedBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of encoded data - the data 
is not ready yet ", e);
+    }
+  }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 817139b3d..425cb46d4 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 /**
  * Fetches data from a data source.
@@ -42,19 +43,23 @@
   @Override
   Object fetchDataElement() throws IOException {
     if (iterator == null) {
-      final long start = System.currentTimeMillis();
-      iterator = this.readable.read().iterator();
-      boundedSourceReadTime += System.currentTimeMillis() - start;
+      fetchDataLazily();
     }
 
     if (iterator.hasNext()) {
       return iterator.next();
     } else {
-      return null;
+      throw new NoSuchElementException();
     }
   }
 
-  public final long getBoundedSourceReadTime() {
+  private void fetchDataLazily() throws IOException {
+    final long start = System.currentTimeMillis();
+    iterator = this.readable.read().iterator();
+    boundedSourceReadTime += System.currentTimeMillis() - start;
+  }
+
+  final long getBoundedSourceReadTime() {
     return boundedSourceReadTime;
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 809d74787..807e5bb84 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -220,19 +220,15 @@ private void processElementRecursively(final 
VertexHarness vertexHarness, final
     }
 
     // Given a single input element, a vertex can produce many output elements.
-    // Here, we recursively process all of the main output elements.
-    while (!outputCollector.isEmpty()) {
-      final Object element = outputCollector.remove();
-      handleMainOutputElement(vertexHarness, element); // Recursion
-    }
+    // Here, we recursively process all of the main oltput elements.
+    outputCollector.iterateMain().forEach(element -> 
handleMainOutputElement(vertexHarness, element)); // Recursion
+    outputCollector.clearMain();
 
     // Recursively process all of the additional output elements.
-    
vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(value -> {
-      final String dstVertexId = (String) value;
-      while (!outputCollector.isEmpty(dstVertexId)) {
-        final Object element = outputCollector.remove(dstVertexId);
-        handleAdditionalOutputElement(vertexHarness, element, dstVertexId); // 
Recursion
-      }
+    vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(tag 
-> {
+      outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, 
tag)); // Recursion
+      outputCollector.clearTag(tag);
     });
   }
 
@@ -326,17 +322,14 @@ private void finalizeVertex(final VertexHarness 
vertexHarness) {
     final OutputCollectorImpl outputCollector = 
vertexHarness.getOutputCollector();
 
     // handle main outputs
-    while (!outputCollector.isEmpty()) {
-      final Object element = outputCollector.remove();
-      handleMainOutputElement(vertexHarness, element);
-    }
+    outputCollector.iterateMain().forEach(element -> 
handleMainOutputElement(vertexHarness, element)); // Recursion
+    outputCollector.clearMain();
 
     // handle additional tagged outputs
     vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      while (!outputCollector.isEmpty(tag)) {
-        final Object element = outputCollector.remove(tag);
-        handleAdditionalOutputElement(vertexHarness, element, tag);
-      }
+      outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, 
tag)); // Recursion
+      outputCollector.clearTag(tag);
     });
     finalizeOutputWriters(vertexHarness);
   }
@@ -383,16 +376,11 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
       for (int i = 0; i < availableFetchers.size(); i++) {
         final DataFetcher dataFetcher = availableFetchers.get(i);
         final Object element;
+
         try {
           element = dataFetcher.fetchDataElement();
-        } catch (IOException e) {
-          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
-              Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
-          LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e.toString());
-          return false;
-        }
-
-        if (element == null) {
+        } catch (NoSuchElementException e) {
+          // We've consumed all the data from this data fetcher.
           if (dataFetcher instanceof SourceVertexDataFetcher) {
             boundedSourceReadTime += ((SourceVertexDataFetcher) 
dataFetcher).getBoundedSourceReadTime();
           } else if (dataFetcher instanceof ParentTaskDataFetcher) {
@@ -401,12 +389,19 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
           }
           finishedFetcherIndex = i;
           break;
+        } catch (IOException e) {
+          // IOException means that this task should be retried.
+          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+              Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e.toString());
+          return false;
+        }
+
+        // Successfully fetched an element
+        if (dataFetcher.isFromSideInput()) {
+          sideInputMap.put(((OperatorVertex) 
dataFetcher.getDataSource()).getTransform().getTag(), element);
         } else {
-          if (dataFetcher.isFromSideInput()) {
-            sideInputMap.put(((OperatorVertex) 
dataFetcher.getDataSource()).getTransform().getTag(), element);
-          } else {
-            processElementRecursively(dataFetcher.getChild(), element);
-          }
+          processElementRecursively(dataFetcher.getChild(), element);
         }
       }
 
@@ -588,6 +583,7 @@ private void finalizeOutputWriters(final VertexHarness 
vertexHarness) {
     // finalize OutputWriters for additional tagged children
     
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter
 -> {
       outputWriter.close();
+
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
     });
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 3f1fd3d2c..47c2d19b2 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,11 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -35,7 +31,6 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 /**
@@ -45,31 +40,28 @@
 @PrepareForTest({InputReader.class, VertexHarness.class})
 public final class ParentTaskDataFetcherTest {
 
-  @Test(timeout=5000)
-  public void testVoid() throws Exception {
-    // TODO #173: Properly handle zero-element. This test should be updated 
too.
-    final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = 
generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
-        "VoidCoder");
+  @Test(timeout=5000, expected = NoSuchElementException.class)
+  public void testEmpty() throws Exception {
+    final List<String> empty = new ArrayList<>(0); // empty data
+    final InputReader inputReader = 
generateInputReader(generateCompletableFuture(empty.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
 
-    // Should return Void.TYPE
-    assertEquals(Void.TYPE, fetcher.fetchDataElement());
+    // Should trigger the expected 'NoSuchElementException'
+    fetcher.fetchDataElement();
   }
 
   @Test(timeout=5000)
-  public void testEmpty() throws Exception {
-    // TODO #173: Properly handle zero-element. This test should be updated 
too.
-    final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = 
generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
-        "IntCoder");
+  public void testNull() throws Exception {
+    final List<String> oneNull = new ArrayList<>(1); // empty data
+    oneNull.add(null);
+    final InputReader inputReader = 
generateInputReader(generateCompletableFuture(oneNull.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
 
-    // Should return Void.TYPE
+    // Should return 'null'
     assertEquals(null, fetcher.fetchDataElement());
   }
 
@@ -86,7 +78,6 @@ public void testNonEmpty() throws Exception {
 
     // Should return only a single element
     assertEquals(singleData, fetcher.fetchDataElement());
-    assertEquals(null, fetcher.fetchDataElement());
   }
 
   @Test(timeout=5000, expected = IOException.class)
@@ -131,28 +122,6 @@ private ParentTaskDataFetcher createFetcher(final 
InputReader readerForParentTas
         false);
   }
 
-
-  private DecoderFactory generateCoder(final String coder) {
-    final DecoderFactory decoderFactory = mock(DecoderFactory.class);
-    when(decoderFactory.toString()).thenReturn(coder);
-    return decoderFactory;
-  }
-
-  private RuntimeEdge generateEdge(final String coder) {
-    final String runtimeIREdgeId = "Runtime edge with coder";
-    final ExecutionPropertyMap edgeProperties = new 
ExecutionPropertyMap(runtimeIREdgeId);
-    edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
-    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, 
mock(IRVertex.class), mock(IRVertex.class), false);
-  }
-
-  private InputReader generateInputReaderWithCoder(final CompletableFuture 
completableFuture, final String coder) {
-    final InputReader inputReader = mock(InputReader.class);
-    when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
-    final RuntimeEdge runtimeEdge = generateEdge(coder);
-    when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
-    return inputReader;
-  }
-
   private InputReader generateInputReader(final CompletableFuture 
completableFuture) {
     final InputReader inputReader = mock(InputReader.class);
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to