ayushtkn commented on code in PR #341:
URL: https://github.com/apache/tez/pull/341#discussion_r1547274765


##########
tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java:
##########
@@ -56,9 +62,12 @@ public class TestMRInputHelpers {
   private static Path oldSplitsDir;
   private static Path newSplitsDir;
 
-  private static String TEST_ROOT_DIR = "target"
+  private static final String TEST_ROOT_DIR = "target"
       + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir";
 
+  private static final Path LOCAL_TEST_ROOT_DIR = new Path("target"
+      + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
+

Review Comment:
   this is creating a path & then resolving it to LocalFs, maybe just doing a 
``Files.createTempDirectory`` in the test might have saved you from this 
multiple..



##########
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java:
##########
@@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
     return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
   }
 
+  public static MRSplitProto getProto(InputDataInformationEvent initEvent, 
JobConf jobConf) throws IOException {
+    return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? 
readProtoFromPayload(initEvent)
+      : readProtoFromFs(initEvent, jobConf);
+  }
+
+  private static MRSplitProto readProtoFromFs(InputDataInformationEvent 
initEvent, JobConf jobConf) throws IOException {
+    String serializedPath = initEvent.getSerializedPath();
+    Path filePath = new Path(serializedPath);
+    LOG.info("Reading InputDataInformationEvent from path: {}", filePath);
+
+    MRSplitProto splitProto = null;
+    FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);
+
+    try (FSDataInputStream in = fs.open(filePath)) {
+      splitProto = MRSplitProto.parseFrom(in);
+      fs.delete(filePath, false);
+    }
+    return splitProto;
+  }
+
+  private static MRSplitProto readProtoFromPayload(InputDataInformationEvent 
initEvent) throws IOException {
+    ByteBuffer payload = initEvent.getUserPayload();
+    LOG.info("Reading InputDataInformationEvent from payload, size: {} 
bytes}", payload.limit());

Review Comment:
   ``initEvent.getUserPayload()`` can return `null` as well, there is a logic 
inside that, if that returns `null`, this log line will shoot a NPE., maybe 
just put `payload` in the log & if it is not null, the `ByteBuffer` 
`toString()` shall print the limit along with other details



##########
tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java:
##########
@@ -87,7 +87,12 @@ public interface InputInitializerContext {
    * @return Resource
    */
   Resource getVertexTaskResource();
-  
+
+  /**
+   * Get the vertex id as integer that belongs to this input.
+   */
+  int getVertexId();

Review Comment:
   Where is this being used?



##########
tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java:
##########
@@ -135,15 +138,22 @@ public static VertexManagerEvent 
convertVertexManagerEventFromProto(
     if (event.getUserPayload() != null) {
       builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
     }
+    if (event.getSerializedPath() != null) {
+      
builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8)));
+    }
     return builder.build();
   }
 
-  public static InputDataInformationEvent
-      convertRootInputDataInformationEventFromProto(
+  public static InputDataInformationEvent 
convertRootInputDataInformationEventFromProto(
       EventProtos.RootInputDataInformationEventProto proto) {
-    InputDataInformationEvent diEvent = 
InputDataInformationEvent.createWithSerializedPayload(
-        proto.getSourceIndex(),
-        proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() 
: null);
+    ByteBuffer payload = proto.hasUserPayload() ? 
proto.getUserPayload().asReadOnlyByteBuffer() : null;
+    InputDataInformationEvent diEvent = null;
+    if (!proto.getSerializedPath().isEmpty()) {

Review Comment:
   Should there be a ``proto.hasSerializedPath()`` check?



##########
tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java:
##########
@@ -135,15 +138,22 @@ public static VertexManagerEvent 
convertVertexManagerEventFromProto(
     if (event.getUserPayload() != null) {
       builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
     }
+    if (event.getSerializedPath() != null) {
+      
builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8)));
+    }
     return builder.build();
   }
 
-  public static InputDataInformationEvent
-      convertRootInputDataInformationEventFromProto(
+  public static InputDataInformationEvent 
convertRootInputDataInformationEventFromProto(
       EventProtos.RootInputDataInformationEventProto proto) {
-    InputDataInformationEvent diEvent = 
InputDataInformationEvent.createWithSerializedPayload(
-        proto.getSourceIndex(),
-        proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() 
: null);
+    ByteBuffer payload = proto.hasUserPayload() ? 
proto.getUserPayload().asReadOnlyByteBuffer() : null;
+    InputDataInformationEvent diEvent = null;
+    if (!proto.getSerializedPath().isEmpty()) {

Review Comment:
   At this point, there maybe a possibility that payload wasn't `null` as well, 
but we ignored that because there was a `serializedPath`?
   Should we throw out in that case?



##########
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java:
##########
@@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
     return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
   }
 
+  public static MRSplitProto getProto(InputDataInformationEvent initEvent, 
JobConf jobConf) throws IOException {
+    return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? 
readProtoFromPayload(initEvent)
+      : readProtoFromFs(initEvent, jobConf);
+  }
+
+  private static MRSplitProto readProtoFromFs(InputDataInformationEvent 
initEvent, JobConf jobConf) throws IOException {
+    String serializedPath = initEvent.getSerializedPath();
+    Path filePath = new Path(serializedPath);
+    LOG.info("Reading InputDataInformationEvent from path: {}", filePath);
+
+    MRSplitProto splitProto = null;
+    FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);

Review Comment:
   Change to
   ```
        FileSystem fs = filePath.getFileSystem(jobConf);
   ```



##########
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java:
##########
@@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
     return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
   }
 
+  public static MRSplitProto getProto(InputDataInformationEvent initEvent, 
JobConf jobConf) throws IOException {
+    return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? 
readProtoFromPayload(initEvent)
+      : readProtoFromFs(initEvent, jobConf);
+  }
+
+  private static MRSplitProto readProtoFromFs(InputDataInformationEvent 
initEvent, JobConf jobConf) throws IOException {
+    String serializedPath = initEvent.getSerializedPath();
+    Path filePath = new Path(serializedPath);
+    LOG.info("Reading InputDataInformationEvent from path: {}", filePath);
+
+    MRSplitProto splitProto = null;
+    FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);
+
+    try (FSDataInputStream in = fs.open(filePath)) {
+      splitProto = MRSplitProto.parseFrom(in);
+      fs.delete(filePath, false);

Review Comment:
   If the parsing fails, in that case the file won't be deleted. is that 
expected? and if your delete fails our operation will fail as well, so it that 
ok or we can live with that?



##########
tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java:
##########
@@ -232,30 +278,32 @@ private DataSourceDescriptor 
generateDataSourceDescriptorMapRed(Path inputSplits
   @Test(timeout = 5000)
   public void testInputSplitLocalResourceCreationWithDifferentFS() throws 
Exception {
     FileSystem localFs = FileSystem.getLocal(conf);
-    Path LOCAL_TEST_ROOT_DIR = new Path("target"
-        + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
+    Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
 
-    try {
-      localFs.mkdirs(LOCAL_TEST_ROOT_DIR);
-
-      Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
+    DataSourceDescriptor dataSource = 
generateDataSourceDescriptorMapRed(splitsDir);
 
-      DataSourceDescriptor dataSource = 
generateDataSourceDescriptorMapRed(splitsDir);
-
-      Map<String, LocalResource> localResources = 
dataSource.getAdditionalLocalFiles();
+    Map<String, LocalResource> localResources = 
dataSource.getAdditionalLocalFiles();
 
-      Assert.assertEquals(2, localResources.size());
-      Assert.assertTrue(localResources.containsKey(
-          MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
-      Assert.assertTrue(localResources.containsKey(
-          MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
+    Assert.assertEquals(2, localResources.size());
+    Assert.assertTrue(localResources.containsKey(
+        MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
+    Assert.assertTrue(localResources.containsKey(
+        MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
 
-      for (LocalResource lr : localResources.values()) {
-        
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
-      }
-    } finally {
-      localFs.delete(LOCAL_TEST_ROOT_DIR, true);
+    for (LocalResource lr : localResources.values()) {
+      
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
     }
   }
 
+  @Before
+  public void before() throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);

Review Comment:
   We are getting the `localFs` multiple times, can we store it as a class 
variable just like the `remoteFs`



##########
tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java:
##########
@@ -103,6 +113,7 @@ public Object getDeserializedUserPayload() {
   public String toString() {
     return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", 
targetIndex="
         + targetIndex + ", serializedUserPayloadExists=" + (userPayload != 
null)
-        + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + 
"]";
-  } 
+        + ", deserializedUserPayloadExists=" + (userPayloadObject != null)
+        + serializedPath != null ? (", serializedPath=" + serializedPath) : "" 
+ "]";

Review Comment:
   This is wrong.
   ```
   "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
           + targetIndex + ", serializedUserPayloadExists=" + (userPayload != 
null)
           + ", deserializedUserPayloadExists=" + (userPayloadObject != null)
           + serializedPath != null ? <Something>
   ```
   
   This will evaluate the != operator and check if everything  before != is not 
null, which is always true, so it will always print just
   ``
   (", serializedPath=" + serializedPath)
   ``
   
   You need a bracket here:
   ```
       return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", 
targetIndex="
           + targetIndex + ", serializedUserPayloadExists=" + (userPayload != 
null)
           + ", deserializedUserPayloadExists=" + (userPayloadObject != null)
           + (serializedPath != null ? (", serializedPath=" + serializedPath) : 
"" )+ "]";
   ```
   
   It is too much concat with conditions, maybe we can explore StringBuilder, 
that might look more redable, something like this
   ```
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex)
           .append(", targetIndex=").append(targetIndex)
           .append(", serializedUserPayloadExists=").append(userPayload != null)
           .append(", deserializedUserPayloadExists=").append(userPayloadObject 
!= null);
       if (serializedPath != null) {
         sb.append(", serializedPath=").append(serializedPath);
       }
       sb.append("]");
       return sb.toString();
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to