This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 106211a014318414d9386dae00d81ab727e930c1 Author: Jeongyoon Eo <[email protected]> AuthorDate: Mon Mar 5 16:56:41 2018 +0900 fix DoTransfrom and OutputWriter --- .../nemo/compiler/frontend/beam/transform/DoTransform.java | 14 ++++++++------ .../edu/snu/nemo/runtime/executor/TaskGroupExecutor.java | 2 -- .../nemo/runtime/executor/datatransfer/OutputWriter.java | 14 ++++---------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java index 663920f..aa56964 100644 --- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java +++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java @@ -47,6 +47,10 @@ public final class DoTransform<I, O> implements Transform<I, O> { private final String serializedOptions; private Map<PCollectionView, Object> sideInputs; private Pipe<O> pipe; + private StartBundleContext startBundleContext; + private FinishBundleContext finishBundleContext; + private ProcessContext processContext; + private DoFnInvoker invoker; /** * DoTransform Constructor. @@ -68,14 +72,14 @@ public final class DoTransform<I, O> implements Transform<I, O> { this.pipe = p; this.sideInputs = new HashMap<>(); context.getSideInputs().forEach((k, v) -> this.sideInputs.put(((CreateViewTransform) k).getTag(), v)); + this.startBundleContext = new StartBundleContext(doFn, serializedOptions); + this.finishBundleContext = new FinishBundleContext(doFn, pipe, serializedOptions); + this.processContext = new ProcessContext(doFn, pipe, sideInputs, serializedOptions); + this.invoker = DoFnInvokers.invokerFor(doFn); } @Override public void onData(final Object data) { - final StartBundleContext startBundleContext = new StartBundleContext(doFn, serializedOptions); - final FinishBundleContext finishBundleContext = new FinishBundleContext(doFn, pipe, serializedOptions); - final ProcessContext processContext = new ProcessContext(doFn, pipe, sideInputs, serializedOptions); - final DoFnInvoker invoker = DoFnInvokers.invokerFor(doFn); invoker.invokeSetup(); invoker.invokeStartBundle(startBundleContext); if (data instanceof Iterable) { @@ -93,10 +97,8 @@ public final class DoTransform<I, O> implements Transform<I, O> { @Override public void close() { - // do nothing } - @Override public String toString() { final StringBuilder sb = new StringBuilder(); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java index 75c743b..1b7f7eb 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java @@ -483,13 +483,11 @@ public final class TaskGroupExecutor { long boundedSrcReadEndTime = 0; long inputReadStartTime = 0; long inputReadEndTime = 0; - if (isExecutionRequested) { throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!"); } else { isExecutionRequested = true; } - taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty()); LOG.info("{} Executing!", taskGroupId); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java index 7495373..6f9a594 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java @@ -42,7 +42,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { private final List<Long> accumulatedPartitionSizeInfo; private final List<Long> writtenBytes; private final BlockManagerWorker blockManagerWorker; - private final ArrayDeque<Object> outputQueue; + private final List<Object> outputList; public OutputWriter(final int hashRangeMultiplier, final int srcTaskIdx, @@ -59,7 +59,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { this.blockManagerWorker = blockManagerWorker; this.blockStoreValue = runtimeEdge.getProperty(ExecutionProperty.Key.DataStore); this.partitionerMap = new HashMap<>(); - this.outputQueue = new ArrayDeque<>(); + this.outputList = new ArrayList<>(); this.writtenBytes = new ArrayList<>(); // TODO #511: Refactor metric aggregation for (general) run-rime optimization. this.accumulatedPartitionSizeInfo = new ArrayList<>(); @@ -71,7 +71,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { } public void writeElement(final Object element) { - outputQueue.add(element); + outputList.add(element); } /** @@ -79,12 +79,6 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { **/ public void write() { // Aggregate element to form the inter-Stage data. - List<Object> dataToWrite = new ArrayList<>(); - while (outputQueue.size() > 0) { - Object output = outputQueue.remove(); - dataToWrite.add(output); - } - final Boolean isDataSizeMetricCollectionEdge = MetricCollectionProperty.Value.DataSkewRuntimePass .equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection)); @@ -109,7 +103,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable { && duplicateDataProperty.getGroupSize() > 1) { partitionsToWrite = partitioner.partition(Collections.emptyList(), dstParallelism, keyExtractor); } else { - partitionsToWrite = partitioner.partition(dataToWrite, dstParallelism, keyExtractor); + partitionsToWrite = partitioner.partition(outputList, dstParallelism, keyExtractor); } // Write the grouped blocks into partitions. -- To stop receiving notification emails like this one, please contact [email protected].
