[hotfix] Fix unnecessary stream wrapping for Netty Error Message Frames
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d46c9d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d46c9d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d46c9d3 Branch: refs/heads/master Commit: 1d46c9d3623519e92b9cd7f147f15cbde527fb65 Parents: 1db14fc Author: Stephan Ewen <[email protected]> Authored: Sun Oct 23 18:56:27 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 26 12:36:12 2016 +0200 ---------------------------------------------------------------------- .../runtime/io/network/netty/NettyMessage.java | 233 +------------------ 1 file changed, 10 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d46c9d3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 2b03f1d..b97bd82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -28,10 +28,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.MessageToMessageDecoder; -import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; + import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; @@ -289,17 +286,9 @@ abstract class NettyMessage { @Override ByteBuf write(ByteBufAllocator allocator) throws IOException { - ByteBuf result = null; - - ObjectOutputStream oos = null; - - try { - result = allocateBuffer(allocator, ID); - - DataOutputView outputView = new ByteBufDataOutputView(result); - - oos = new ObjectOutputStream(new DataOutputViewStream(outputView)); + final ByteBuf result = allocateBuffer(allocator, ID); + try (ObjectOutputStream oos = new ObjectOutputStream(new ByteBufOutputStream(result))) { oos.writeObject(cause); if (receiverId != null) { @@ -311,30 +300,22 @@ abstract class NettyMessage { // Update frame length... result.setInt(0, result.readableBytes()); + return result; } catch (Throwable t) { - if (result != null) { - result.release(); - } + result.release(); - throw new IOException(t); - } finally { - if(oos != null) { - oos.close(); + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException(t); } } - - return result; } @Override void readFrom(ByteBuf buffer) throws Exception { - DataInputView inputView = new ByteBufDataInputView(buffer); - ObjectInputStream ois = null; - - try { - ois = new ObjectInputStream(new DataInputViewStream(inputView)); - + try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) { Object obj = ois.readObject(); if (!(obj instanceof Throwable)) { @@ -347,10 +328,6 @@ abstract class NettyMessage { receiverId = InputChannelID.fromByteBuf(buffer); } } - } finally { - if (ois != null) { - ois.close(); - } } } } @@ -540,194 +517,4 @@ abstract class NettyMessage { void readFrom(ByteBuf buffer) throws Exception { } } - - // ------------------------------------------------------------------------ - - private static class ByteBufDataInputView implements DataInputView { - - private final ByteBufInputStream inputView; - - public ByteBufDataInputView(ByteBuf buffer) { - this.inputView = new ByteBufInputStream(buffer); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } - - @Override - public int read(byte[] b) throws IOException { - return inputView.read(b); - } - - @Override - public void readFully(byte[] b) throws IOException { - inputView.readFully(b); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - inputView.readFully(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - return inputView.skipBytes(n); - } - - @Override - public boolean readBoolean() throws IOException { - return inputView.readBoolean(); - } - - @Override - public byte readByte() throws IOException { - return inputView.readByte(); - } - - @Override - public int readUnsignedByte() throws IOException { - return inputView.readUnsignedByte(); - } - - @Override - public short readShort() throws IOException { - return inputView.readShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - return inputView.readUnsignedShort(); - } - - @Override - public char readChar() throws IOException { - return inputView.readChar(); - } - - @Override - public int readInt() throws IOException { - return inputView.readInt(); - } - - @Override - public long readLong() throws IOException { - return inputView.readLong(); - } - - @Override - public float readFloat() throws IOException { - return inputView.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return inputView.readDouble(); - } - - @Override - public String readLine() throws IOException { - return inputView.readLine(); - } - - @Override - public String readUTF() throws IOException { - return inputView.readUTF(); - } - } - - private static class ByteBufDataOutputView implements DataOutputView { - - private final ByteBufOutputStream outputView; - - public ByteBufDataOutputView(ByteBuf buffer) { - this.outputView = new ByteBufOutputStream(buffer); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void write(int b) throws IOException { - outputView.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - outputView.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } - - @Override - public void writeBoolean(boolean v) throws IOException { - outputView.writeBoolean(v); - } - - @Override - public void writeByte(int v) throws IOException { - outputView.writeByte(v); - } - - @Override - public void writeShort(int v) throws IOException { - outputView.writeShort(v); - } - - @Override - public void writeChar(int v) throws IOException { - outputView.writeChar(v); - } - - @Override - public void writeInt(int v) throws IOException { - outputView.writeInt(v); - } - - @Override - public void writeLong(long v) throws IOException { - outputView.writeLong(v); - } - - @Override - public void writeFloat(float v) throws IOException { - outputView.writeFloat(v); - } - - @Override - public void writeDouble(double v) throws IOException { - outputView.writeDouble(v); - } - - @Override - public void writeBytes(String s) throws IOException { - outputView.writeBytes(s); - } - - @Override - public void writeChars(String s) throws IOException { - outputView.writeChars(s); - } - - @Override - public void writeUTF(String s) throws IOException { - outputView.writeUTF(s); - } - } }
