[hotfix] Fix unnecessary stream wrapping for Netty Error Message Frames

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d46c9d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d46c9d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d46c9d3

Branch: refs/heads/master
Commit: 1d46c9d3623519e92b9cd7f147f15cbde527fb65
Parents: 1db14fc
Author: Stephan Ewen <[email protected]>
Authored: Sun Oct 23 18:56:27 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 26 12:36:12 2016 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 233 +------------------
 1 file changed, 10 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d46c9d3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2b03f1d..b97bd82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -28,10 +28,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -289,17 +286,9 @@ abstract class NettyMessage {
 
                @Override
                ByteBuf write(ByteBufAllocator allocator) throws IOException {
-                       ByteBuf result = null;
-
-                       ObjectOutputStream oos = null;
-
-                       try {
-                               result = allocateBuffer(allocator, ID);
-
-                               DataOutputView outputView = new 
ByteBufDataOutputView(result);
-
-                               oos = new ObjectOutputStream(new 
DataOutputViewStream(outputView));
+                       final ByteBuf result = allocateBuffer(allocator, ID);
 
+                       try (ObjectOutputStream oos = new 
ObjectOutputStream(new ByteBufOutputStream(result))) {
                                oos.writeObject(cause);
 
                                if (receiverId != null) {
@@ -311,30 +300,22 @@ abstract class NettyMessage {
 
                                // Update frame length...
                                result.setInt(0, result.readableBytes());
+                               return result;
                        }
                        catch (Throwable t) {
-                               if (result != null) {
-                                       result.release();
-                               }
+                               result.release();
 
-                               throw new IOException(t);
-                       } finally {
-                               if(oos != null) {
-                                       oos.close();
+                               if (t instanceof IOException) {
+                                       throw (IOException) t;
+                               } else {
+                                       throw new IOException(t);
                                }
                        }
-
-                       return result;
                }
 
                @Override
                void readFrom(ByteBuf buffer) throws Exception {
-                       DataInputView inputView = new 
ByteBufDataInputView(buffer);
-                       ObjectInputStream ois = null;
-
-                       try {
-                               ois = new ObjectInputStream(new 
DataInputViewStream(inputView));
-
+                       try (ObjectInputStream ois = new ObjectInputStream(new 
ByteBufInputStream(buffer))) {
                                Object obj = ois.readObject();
 
                                if (!(obj instanceof Throwable)) {
@@ -347,10 +328,6 @@ abstract class NettyMessage {
                                                receiverId = 
InputChannelID.fromByteBuf(buffer);
                                        }
                                }
-                       } finally {
-                               if (ois != null) {
-                                       ois.close();
-                               }
                        }
                }
        }
@@ -540,194 +517,4 @@ abstract class NettyMessage {
                void readFrom(ByteBuf buffer) throws Exception {
                }
        }
-
-       // 
------------------------------------------------------------------------
-
-       private static class ByteBufDataInputView implements DataInputView {
-
-               private final ByteBufInputStream inputView;
-
-               public ByteBufDataInputView(ByteBuf buffer) {
-                       this.inputView = new ByteBufInputStream(buffer);
-               }
-
-               @Override
-               public void skipBytesToRead(int numBytes) throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int read(byte[] b, int off, int len) throws IOException {
-                       return inputView.read(b, off, len);
-               }
-
-               @Override
-               public int read(byte[] b) throws IOException {
-                       return inputView.read(b);
-               }
-
-               @Override
-               public void readFully(byte[] b) throws IOException {
-                       inputView.readFully(b);
-               }
-
-               @Override
-               public void readFully(byte[] b, int off, int len) throws 
IOException {
-                       inputView.readFully(b, off, len);
-               }
-
-               @Override
-               public int skipBytes(int n) throws IOException {
-                       return inputView.skipBytes(n);
-               }
-
-               @Override
-               public boolean readBoolean() throws IOException {
-                       return inputView.readBoolean();
-               }
-
-               @Override
-               public byte readByte() throws IOException {
-                       return inputView.readByte();
-               }
-
-               @Override
-               public int readUnsignedByte() throws IOException {
-                       return inputView.readUnsignedByte();
-               }
-
-               @Override
-               public short readShort() throws IOException {
-                       return inputView.readShort();
-               }
-
-               @Override
-               public int readUnsignedShort() throws IOException {
-                       return inputView.readUnsignedShort();
-               }
-
-               @Override
-               public char readChar() throws IOException {
-                       return inputView.readChar();
-               }
-
-               @Override
-               public int readInt() throws IOException {
-                       return inputView.readInt();
-               }
-
-               @Override
-               public long readLong() throws IOException {
-                       return inputView.readLong();
-               }
-
-               @Override
-               public float readFloat() throws IOException {
-                       return inputView.readFloat();
-               }
-
-               @Override
-               public double readDouble() throws IOException {
-                       return inputView.readDouble();
-               }
-
-               @Override
-               public String readLine() throws IOException {
-                       return inputView.readLine();
-               }
-
-               @Override
-               public String readUTF() throws IOException {
-                       return inputView.readUTF();
-               }
-       }
-
-       private static class ByteBufDataOutputView implements DataOutputView {
-
-               private final ByteBufOutputStream outputView;
-
-               public ByteBufDataOutputView(ByteBuf buffer) {
-                       this.outputView = new ByteBufOutputStream(buffer);
-               }
-
-               @Override
-               public void skipBytesToWrite(int numBytes) throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public void write(DataInputView source, int numBytes) throws 
IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       outputView.write(b);
-               }
-
-               @Override
-               public void write(byte[] b) throws IOException {
-                       outputView.write(b);
-               }
-
-               @Override
-               public void write(byte[] b, int off, int len) throws 
IOException {
-                       outputView.write(b, off, len);
-               }
-
-               @Override
-               public void writeBoolean(boolean v) throws IOException {
-                       outputView.writeBoolean(v);
-               }
-
-               @Override
-               public void writeByte(int v) throws IOException {
-                       outputView.writeByte(v);
-               }
-
-               @Override
-               public void writeShort(int v) throws IOException {
-                       outputView.writeShort(v);
-               }
-
-               @Override
-               public void writeChar(int v) throws IOException {
-                       outputView.writeChar(v);
-               }
-
-               @Override
-               public void writeInt(int v) throws IOException {
-                       outputView.writeInt(v);
-               }
-
-               @Override
-               public void writeLong(long v) throws IOException {
-                       outputView.writeLong(v);
-               }
-
-               @Override
-               public void writeFloat(float v) throws IOException {
-                       outputView.writeFloat(v);
-               }
-
-               @Override
-               public void writeDouble(double v) throws IOException {
-                       outputView.writeDouble(v);
-               }
-
-               @Override
-               public void writeBytes(String s) throws IOException {
-                       outputView.writeBytes(s);
-               }
-
-               @Override
-               public void writeChars(String s) throws IOException {
-                       outputView.writeChars(s);
-               }
-
-               @Override
-               public void writeUTF(String s) throws IOException {
-                       outputView.writeUTF(s);
-               }
-       }
 }

Reply via email to