This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new d2c5c44 [NEMO-293] OOM exception in streaming (#164)
d2c5c44 is described below
commit d2c5c4476f6319cf9d91fbc55b0e6f00d4b229ef
Author: Taegeon Um <[email protected]>
AuthorDate: Mon Dec 3 13:37:55 2018 +0900
[NEMO-293] OOM exception in streaming (#164)
JIRA: [NEMO-293: OOM exception in
streaming](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-293)
**Major changes:**
- Fix wrong byte encoding in `PipeOutputWriter`. This causes OOM because it
sends unnecessary bytes (count <= byte array size)
- Add `writeElement` method to `ByteOutputContext` to emit data without
copying byte array.
---
.../executor/bytetransfer/ByteOutputContext.java | 36 +++++++++++++++++++---
.../datatransfer/DataFetcherOutputCollector.java | 2 ++
.../executor/datatransfer/PipeOutputWriter.java | 15 +--------
3 files changed, 34 insertions(+), 19 deletions(-)
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 315760c..12761b2 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -18,10 +18,14 @@
*/
package org.apache.nemo.runtime.executor.bytetransfer;
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.FileArea;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +98,7 @@ public final class ByteOutputContext extends
ByteTransferContext implements Auto
currentByteOutputStream.close();
}
channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId()))
- .addListener(getChannelWriteListener());
+ .addListener(getChannelWriteListener());
deregister();
closed = true;
}
@@ -150,7 +154,7 @@ public final class ByteOutputContext extends
ByteTransferContext implements Auto
* @throws IOException when an exception has been set or this stream was
closed
*/
public ByteOutputStream writeSerializedPartition(final SerializedPartition
serializedPartition)
- throws IOException {
+ throws IOException {
write(serializedPartition.getData(), 0, serializedPartition.getLength());
return this;
}
@@ -177,7 +181,7 @@ public final class ByteOutputContext extends
ByteTransferContext implements Auto
}
@Override
- public synchronized void close() throws IOException {
+ public void close() throws IOException {
if (closed) {
return;
}
@@ -199,18 +203,40 @@ public final class ByteOutputContext extends
ByteTransferContext implements Auto
}
/**
+ * Write an element to the channel.
+ * @param element element
+ * @param serializer serializer
+ */
+ public void writeElement(final Object element,
+ final Serializer serializer) {
+ final ByteBuf byteBuf = channel.alloc().ioBuffer();
+ final ByteBufOutputStream byteBufOutputStream = new
ByteBufOutputStream(byteBuf);
+ try {
+ final OutputStream wrapped =
+ DataUtil.buildOutputStream(byteBufOutputStream,
serializer.getEncodeStreamChainers());
+ final EncoderFactory.Encoder encoder =
serializer.getEncoderFactory().create(wrapped);
+ encoder.encode(element);
+ wrapped.close();
+
+ writeByteBuf(byteBuf);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
* Writes a data frame.
* @param body the body or {@code null}
* @param length the length of the body, in bytes
* @throws IOException when an exception has been set or this stream was
closed
*/
- private synchronized void writeDataFrame(final Object body, final long
length) throws IOException {
+ private void writeDataFrame(final Object body, final long length) throws
IOException {
ensureNoException();
if (closed) {
throw new IOException("Stream already closed.");
}
channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(),
body, length, newSubStream))
- .addListener(getChannelWriteListener());
+ .addListener(getChannelWriteListener());
newSubStream = false;
}
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index d50ad82..9995e6a 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -37,6 +37,8 @@ public final class DataFetcherOutputCollector<O> implements
OutputCollector<O> {
/**
* It forwards output to the next operator.
* @param nextOperatorVertex next operator to emit data and watermark
+ * @param edgeIndex edge index
+ * @param watermarkManager watermark manager
*/
public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex,
final int edgeIndex,
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index 03d7470..f937975 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -18,14 +18,11 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.DirectByteArrayOutputStream;
-import org.apache.nemo.common.coder.EncoderFactory;
import
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
-import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
@@ -33,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -79,16 +75,7 @@ public final class PipeOutputWriter implements OutputWriter {
private void writeData(final Object element, final List<ByteOutputContext>
pipeList) {
pipeList.forEach(pipe -> {
try (final ByteOutputContext.ByteOutputStream pipeToWriteTo =
pipe.newOutputStream()) {
- // Serialize (Do not compress)
- final DirectByteArrayOutputStream bytesOutputStream = new
DirectByteArrayOutputStream();
- final OutputStream wrapped =
- DataUtil.buildOutputStream(bytesOutputStream,
serializer.getEncodeStreamChainers());
- final EncoderFactory.Encoder encoder =
serializer.getEncoderFactory().create(wrapped);
- encoder.encode(element);
- wrapped.close();
-
- // Write
- pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+ pipeToWriteTo.writeElement(element, serializer);
} catch (IOException e) {
throw new RuntimeException(e); // For now we crash the executor on
IOException
}