This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 106211a014318414d9386dae00d81ab727e930c1
Author: Jeongyoon Eo <[email protected]>
AuthorDate: Mon Mar 5 16:56:41 2018 +0900

    fix DoTransfrom and OutputWriter
---
 .../nemo/compiler/frontend/beam/transform/DoTransform.java | 14 ++++++++------
 .../edu/snu/nemo/runtime/executor/TaskGroupExecutor.java   |  2 --
 .../nemo/runtime/executor/datatransfer/OutputWriter.java   | 14 ++++----------
 3 files changed, 12 insertions(+), 18 deletions(-)

diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
index 663920f..aa56964 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
@@ -47,6 +47,10 @@ public final class DoTransform<I, O> implements Transform<I, 
O> {
   private final String serializedOptions;
   private Map<PCollectionView, Object> sideInputs;
   private Pipe<O> pipe;
+  private StartBundleContext startBundleContext;
+  private FinishBundleContext finishBundleContext;
+  private ProcessContext processContext;
+  private DoFnInvoker invoker;
 
   /**
    * DoTransform Constructor.
@@ -68,14 +72,14 @@ public final class DoTransform<I, O> implements 
Transform<I, O> {
     this.pipe = p;
     this.sideInputs = new HashMap<>();
     context.getSideInputs().forEach((k, v) -> 
this.sideInputs.put(((CreateViewTransform) k).getTag(), v));
+    this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
+    this.finishBundleContext = new FinishBundleContext(doFn, pipe, 
serializedOptions);
+    this.processContext = new ProcessContext(doFn, pipe, sideInputs, 
serializedOptions);
+    this.invoker = DoFnInvokers.invokerFor(doFn);
   }
 
   @Override
   public void onData(final Object data) {
-    final StartBundleContext startBundleContext = new StartBundleContext(doFn, 
serializedOptions);
-    final FinishBundleContext finishBundleContext = new 
FinishBundleContext(doFn, pipe, serializedOptions);
-    final ProcessContext processContext = new ProcessContext(doFn, pipe, 
sideInputs, serializedOptions);
-    final DoFnInvoker invoker = DoFnInvokers.invokerFor(doFn);
     invoker.invokeSetup();
     invoker.invokeStartBundle(startBundleContext);
     if (data instanceof Iterable) {
@@ -93,10 +97,8 @@ public final class DoTransform<I, O> implements Transform<I, 
O> {
 
   @Override
   public void close() {
-    // do nothing
   }
 
-
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 75c743b..1b7f7eb 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -483,13 +483,11 @@ public final class TaskGroupExecutor {
     long boundedSrcReadEndTime = 0;
     long inputReadStartTime = 0;
     long inputReadEndTime = 0;
-
     if (isExecutionRequested) {
       throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
     } else {
       isExecutionRequested = true;
     }
-
     
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.EXECUTING, 
Optional.empty(), Optional.empty());
     LOG.info("{} Executing!", taskGroupId);
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 7495373..6f9a594 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -42,7 +42,7 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
   private final List<Long> accumulatedPartitionSizeInfo;
   private final List<Long> writtenBytes;
   private final BlockManagerWorker blockManagerWorker;
-  private final ArrayDeque<Object> outputQueue;
+  private final List<Object> outputList;
 
   public OutputWriter(final int hashRangeMultiplier,
                       final int srcTaskIdx,
@@ -59,7 +59,7 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
     this.blockManagerWorker = blockManagerWorker;
     this.blockStoreValue = 
runtimeEdge.getProperty(ExecutionProperty.Key.DataStore);
     this.partitionerMap = new HashMap<>();
-    this.outputQueue = new ArrayDeque<>();
+    this.outputList = new ArrayList<>();
     this.writtenBytes = new ArrayList<>();
     // TODO #511: Refactor metric aggregation for (general) run-rime 
optimization.
     this.accumulatedPartitionSizeInfo = new ArrayList<>();
@@ -71,7 +71,7 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
   }
 
   public void writeElement(final Object element) {
-    outputQueue.add(element);
+    outputList.add(element);
   }
 
   /**
@@ -79,12 +79,6 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
    **/
   public void write() {
     // Aggregate element to form the inter-Stage data.
-    List<Object> dataToWrite = new ArrayList<>();
-    while (outputQueue.size() > 0) {
-      Object output = outputQueue.remove();
-      dataToWrite.add(output);
-    }
-
     final Boolean isDataSizeMetricCollectionEdge = 
MetricCollectionProperty.Value.DataSkewRuntimePass
         
.equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
 
@@ -109,7 +103,7 @@ public final class OutputWriter extends DataTransfer 
implements AutoCloseable {
         && duplicateDataProperty.getGroupSize() > 1) {
       partitionsToWrite = partitioner.partition(Collections.emptyList(), 
dstParallelism, keyExtractor);
     } else {
-      partitionsToWrite = partitioner.partition(dataToWrite, dstParallelism, 
keyExtractor);
+      partitionsToWrite = partitioner.partition(outputList, dstParallelism, 
keyExtractor);
     }
 
     // Write the grouped blocks into partitions.

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to