Repository: tez
Updated Branches:
  refs/heads/master b915a0772 -> 3d665f90c


TEZ-3784. Submitting very large DAG throws com.google.protobuf.CodedInputStream 
exception (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3d665f90
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3d665f90
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3d665f90

Branch: refs/heads/master
Commit: 3d665f90c16ede5bea7e046a31dac00f032d9e88
Parents: b915a07
Author: Rajesh Balamohan <[email protected]>
Authored: Sat Jul 8 08:40:27 2017 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Sat Jul 8 08:40:27 2017 +0530

----------------------------------------------------------------------
 .../client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java    | 6 +++++-
 .../rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java       | 5 +++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3d665f90/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index baac186..72cf0d5 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -23,6 +23,7 @@ import java.security.AccessControlException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -166,7 +167,10 @@ public class DAGClientAMProtocolBlockingPBServerImpl 
implements DAGClientAMProto
         // need to deserialize large request from hdfs
         Path requestPath = new Path(request.getSerializedRequestPath());
         try (FSDataInputStream fsDataInputStream = 
stagingFs.open(requestPath)) {
-          request = SubmitDAGRequestProto.parseFrom(fsDataInputStream);
+          CodedInputStream in =
+              CodedInputStream.newInstance(fsDataInputStream);
+          in.setSizeLimit(Integer.MAX_VALUE);
+          request = SubmitDAGRequestProto.parseFrom(in);
         } catch (IOException e) {
           throw wrapException(e);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/3d665f90/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
 
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
index 3c030b3..040ca2f 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java
@@ -68,7 +68,7 @@ public class TestDAGClientAMProtocolBlockingPBServerImpl {
     MockitoAnnotations.initMocks(this);
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 100000)
   @SuppressWarnings("unchecked")
   public void testSubmitDagInSessionWithLargeDagPlan() throws Exception {
     int maxIPCMsgSize = 1024;
@@ -77,7 +77,8 @@ public class TestDAGClientAMProtocolBlockingPBServerImpl {
     TezConfiguration conf = new TezConfiguration();
     conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 
maxIPCMsgSize);
 
-    byte[] randomBytes = new byte[2*maxIPCMsgSize];
+    // Check with 70 MB (64 MB is CodedInputStream's default limit in earlier 
versions of protobuf)
+    byte[] randomBytes = new byte[70 << 20];
     (new Random()).nextBytes(randomBytes);
     UserPayload payload = UserPayload.create(ByteBuffer.wrap(randomBytes));
     Vertex vertex = Vertex.create("V", 
ProcessorDescriptor.create("P").setUserPayload(payload), 1);

Reply via email to