Repository: tez Updated Branches: refs/heads/branch-0.9.0 26ddde6cb -> d1fc0d766 (forced update)
TEZ-3784. Submitting very large DAG throws com.google.protobuf.CodedInputStream exception (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3d665f90 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3d665f90 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3d665f90 Branch: refs/heads/branch-0.9.0 Commit: 3d665f90c16ede5bea7e046a31dac00f032d9e88 Parents: b915a07 Author: Rajesh Balamohan <[email protected]> Authored: Sat Jul 8 08:40:27 2017 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Sat Jul 8 08:40:27 2017 +0530 ---------------------------------------------------------------------- .../client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java | 6 +++++- .../rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3d665f90/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index baac186..72cf0d5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -23,6 +23,7 @@ import java.security.AccessControlException; import java.util.List; import java.util.Map; +import com.google.protobuf.CodedInputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -166,7 +167,10 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto // need to deserialize large request from hdfs Path requestPath = new Path(request.getSerializedRequestPath()); try (FSDataInputStream fsDataInputStream = stagingFs.open(requestPath)) { - request = SubmitDAGRequestProto.parseFrom(fsDataInputStream); + CodedInputStream in = + CodedInputStream.newInstance(fsDataInputStream); + in.setSizeLimit(Integer.MAX_VALUE); + request = SubmitDAGRequestProto.parseFrom(in); } catch (IOException e) { throw wrapException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/3d665f90/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java index 3c030b3..040ca2f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java @@ -68,7 +68,7 @@ public class TestDAGClientAMProtocolBlockingPBServerImpl { MockitoAnnotations.initMocks(this); } - @Test(timeout = 5000) + @Test(timeout = 100000) @SuppressWarnings("unchecked") public void testSubmitDagInSessionWithLargeDagPlan() throws Exception { int maxIPCMsgSize = 1024; @@ -77,7 +77,8 @@ public class TestDAGClientAMProtocolBlockingPBServerImpl { TezConfiguration conf = new TezConfiguration(); conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize); - byte[] randomBytes = new byte[2*maxIPCMsgSize]; + // Check with 70 MB (64 MB is CodedInputStream's default limit in earlier versions of protobuf) + byte[] randomBytes = new byte[70 << 20]; (new Random()).nextBytes(randomBytes); UserPayload payload = UserPayload.create(ByteBuffer.wrap(randomBytes)); Vertex vertex = Vertex.create("V", ProcessorDescriptor.create("P").setUserPayload(payload), 1);
