Repository: tez
Updated Branches:
  refs/heads/master 247719d73 -> ebf3fb133


TEZ-3703. Use a sha comparison to compare vertex and dag resources, if
there is a mismatch. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ebf3fb13
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ebf3fb13
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ebf3fb13

Branch: refs/heads/master
Commit: ebf3fb1339e91d1efa225055ddb456fcdbf7b791
Parents: 247719d
Author: Siddharth Seth <[email protected]>
Authored: Mon May 1 11:58:35 2017 -0700
Committer: Siddharth Seth <[email protected]>
Committed: Mon May 1 11:58:35 2017 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   | 26 ++++++++
 .../main/java/org/apache/tez/dag/api/DAG.java   | 70 +++++++++++++++++---
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  3 +-
 .../tez/dag/utils/RelocalizationUtils.java      | 26 --------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  7 +-
 5 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java 
b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index b16d1ca..ad6e413 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -20,6 +20,7 @@ package org.apache.tez.client;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -38,6 +39,7 @@ import java.util.Vector;
 import java.util.Map.Entry;
 
 import com.google.common.base.Strings;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.util.Precision;
 import org.apache.tez.common.JavaOptsChecker;
@@ -1077,4 +1079,28 @@ public class TezClientUtils {
     }
   }
 
+
+  public static byte[] getLocalSha(Path path, Configuration conf) throws 
IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.getLocal(conf).open(path);
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+
+  public static byte[] getResourceSha(URI uri, Configuration conf) throws 
IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.get(uri, conf).open(new Path(uri));
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c136811..cdfa3b2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -17,8 +17,11 @@
  */
 package org.apache.tez.dag.api;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -34,6 +37,8 @@ import java.util.Stack;
 import org.apache.commons.collections4.BidiMap;
 import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.common.JavaOptsChecker;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
@@ -686,21 +691,69 @@ public class DAG {
     }
 
     // check for conflicts between dag level local resource and vertex level 
local resource
+
+
+    return topologicalVertexStack;
+  }
+
+  @VisibleForTesting
+  void verifyLocalResources(Configuration tezConf) {
     for (Vertex v : vertices.values()) {
-      for (Map.Entry<String, LocalResource> localResource : 
v.getTaskLocalFiles().entrySet()) {
+      for (Map.Entry<String, LocalResource> localResource : v
+          .getTaskLocalFiles().entrySet()) {
         String resourceName = localResource.getKey();
         LocalResource resource = localResource.getValue();
         if (commonTaskLocalFiles.containsKey(resourceName)
-          && !commonTaskLocalFiles.get(resourceName).equals(resource)) {
-          throw new IllegalStateException("There is conflicting local resource 
(" + resourceName
-            + ") between dag local resource and vertex " + v.getName() + " 
local resource. "
-            + "\nResource of dag : " + commonTaskLocalFiles.get(resourceName)
-            + "\nResource of vertex: " + resource);
+            && !commonTaskLocalFiles.get(resourceName).equals(resource)) {
+          // Different for some reason. Compare size, and then eventually hash
+          try {
+
+            LocalResource commonLr = commonTaskLocalFiles.get(resourceName);
+            if (resource.getSize() != commonLr.getSize()) {
+              throw new IllegalStateException(
+                  "There is conflicting local resource (size mismatch) (" +
+                      resourceName
+                      + ") between dag local resource and vertex " +
+                      v.getName() + " local resource. "
+                      + "\nResource of dag : " +
+                      commonTaskLocalFiles.get(resourceName)
+                      + "\nResource of vertex: " + resource);
+            }
+
+            Path vertexResourcePath =
+                ConverterUtils.getPathFromYarnURL(resource.getResource());
+            Path commonResourcePath = ConverterUtils.getPathFromYarnURL(
+                commonLr.getResource());
+
+            byte[] vertexResourceSha = TezClientUtils
+                .getResourceSha(vertexResourcePath.toUri(), tezConf);
+            byte[] commonResourceSha = TezClientUtils
+                .getResourceSha(commonResourcePath.toUri(), tezConf);
+
+            if (!Arrays.equals(vertexResourceSha, commonResourceSha)) {
+              throw new IllegalStateException(
+                  "There is conflicting local resource (sha mismatch) (" +
+                      resourceName
+                      + ") between dag local resource and vertex " +
+                      v.getName() + " local resource. "
+                      + "\nResource of dag : " +
+                      commonTaskLocalFiles.get(resourceName)
+                      + "\nResource of vertex: " + resource);
+            }
+
+          } catch (URISyntaxException | IOException e) {
+            throw new RuntimeException(
+                "Failed while attempting to validate sha for conflicting 
resources (" +
+                    resourceName
+                    + ") between dag local resource and vertex " + v.getName() 
+
+                    " local resource. "
+                    + "\nResource of dag : " +
+                    commonTaskLocalFiles.get(resourceName)
+                    + "\nResource of vertex: " + resource);
+          }
         }
       }
     }
-
-    return topologicalVertexStack;
   }
 
   // Adaptation of Tarjan's algorithm for connected components.
@@ -794,6 +847,7 @@ public class DAG {
       boolean tezLrsAsArchive, ServicePluginsDescriptor 
servicePluginsDescriptor,
       JavaOptsChecker javaOptsChecker) {
     Deque<String> topologicalVertexStack = verify(true);
+    verifyLocalResources(tezConf);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
     dagBuilder.setName(this.name);

http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 5706542..e3c40aa 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -1167,6 +1167,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
+  // Verifies failure in case of a file size difference. Does not verify sha 
differences.
   @Test(timeout = 5000)
   public void testDAGWithConflictingResource() {
     DAG dag = DAG.create("dag");
@@ -1185,7 +1186,7 @@ public class TestDAGVerify {
     dag.addVertex(v1);
 
     try {
-      dag.verify();
+      dag.verifyLocalResources(new TezConfiguration());
       Assert.fail("should report failure on conflict resources");
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("There is conflicting local 
resource"));

http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java 
b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index de0b94c..84a9474 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.utils;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -27,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,28 +66,4 @@ public class RelocalizationUtils {
     fs.copyToLocalFile(srcPath, dFile);
     return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
   }
-
-  public static byte[] getLocalSha(Path path, Configuration conf) throws 
IOException {
-    InputStream is = null;
-    try {
-      is = FileSystem.getLocal(conf).open(path);
-      return DigestUtils.sha256(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
-  }
-
-  public static byte[] getResourceSha(URI uri, Configuration conf) throws 
IOException {
-    InputStream is = null;
-    try {
-      is = FileSystem.get(uri, conf).open(new Path(uri));
-      return DigestUtils.sha256(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 76d9fdb..1995b16 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -67,6 +67,7 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.client.CallerContext;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
@@ -1454,7 +1455,7 @@ public class DAGAppMaster extends AbstractService {
             // The existing file must already be in usercache... let's try to 
find it.
             Path localFile = findLocalFileForResource(fileName);
             if (localFile != null) {
-              oldSha = RelocalizationUtils.getLocalSha(localFile, conf);
+              oldSha = TezClientUtils.getLocalSha(localFile, conf);
             } else {
               LOG.warn("Couldn't find local file for " + oldLr);
             }
@@ -1462,11 +1463,11 @@ public class DAGAppMaster extends AbstractService {
             LOG.warn("Error getting SHA from local file for " + oldLr, ex);
           }
           if (oldSha == null) { // Well, no dice.
-            oldSha = 
RelocalizationUtils.getResourceSha(getLocalResourceUri(oldLr), conf);
+            oldSha = TezClientUtils.getResourceSha(getLocalResourceUri(oldLr), 
conf);
           }
           // Get the new SHA directly from Hadoop stream. If it matches, we 
already have the
           // file, and if it doesn't we are going to fail; no need to download 
either way.
-          byte[] newSha = 
RelocalizationUtils.getResourceSha(getLocalResourceUri(newLr), conf);
+          byte[] newSha = 
TezClientUtils.getResourceSha(getLocalResourceUri(newLr), conf);
           return Arrays.equals(oldSha, newSha);
         }
       });

Reply via email to