Repository: tez Updated Branches: refs/heads/master 247719d73 -> ebf3fb133
TEZ-3703. Use a sha comparison to compare vertex and dag resources, if there is a mismatch. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ebf3fb13 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ebf3fb13 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ebf3fb13 Branch: refs/heads/master Commit: ebf3fb1339e91d1efa225055ddb456fcdbf7b791 Parents: 247719d Author: Siddharth Seth <[email protected]> Authored: Mon May 1 11:58:35 2017 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon May 1 11:58:35 2017 -0700 ---------------------------------------------------------------------- .../org/apache/tez/client/TezClientUtils.java | 26 ++++++++ .../main/java/org/apache/tez/dag/api/DAG.java | 70 +++++++++++++++++--- .../org/apache/tez/dag/api/TestDAGVerify.java | 3 +- .../tez/dag/utils/RelocalizationUtils.java | 26 -------- .../org/apache/tez/dag/app/DAGAppMaster.java | 7 +- 5 files changed, 94 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index b16d1ca..ad6e413 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -20,6 +20,7 @@ package org.apache.tez.client; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -38,6 +39,7 @@ import java.util.Vector; import java.util.Map.Entry; import com.google.common.base.Strings; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.math3.util.Precision; import org.apache.tez.common.JavaOptsChecker; @@ -1077,4 +1079,28 @@ public class TezClientUtils { } } + + public static byte[] getLocalSha(Path path, Configuration conf) throws IOException { + InputStream is = null; + try { + is = FileSystem.getLocal(conf).open(path); + return DigestUtils.sha256(is); + } finally { + if (is != null) { + is.close(); + } + } + } + + public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException { + InputStream is = null; + try { + is = FileSystem.get(uri, conf).open(new Path(uri)); + return DigestUtils.sha256(is); + } finally { + if (is != null) { + is.close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index c136811..cdfa3b2 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -17,8 +17,11 @@ */ package org.apache.tez.dag.api; +import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Deque; @@ -34,6 +37,8 @@ import java.util.Stack; import org.apache.commons.collections4.BidiMap; import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.client.CallerContext; import org.apache.tez.common.JavaOptsChecker; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; @@ -686,21 +691,69 @@ public class DAG { } // check for conflicts between dag level local resource and vertex level local resource + + + return topologicalVertexStack; + } + + @VisibleForTesting + void verifyLocalResources(Configuration tezConf) { for (Vertex v : vertices.values()) { - for (Map.Entry<String, LocalResource> localResource : v.getTaskLocalFiles().entrySet()) { + for (Map.Entry<String, LocalResource> localResource : v + .getTaskLocalFiles().entrySet()) { String resourceName = localResource.getKey(); LocalResource resource = localResource.getValue(); if (commonTaskLocalFiles.containsKey(resourceName) - && !commonTaskLocalFiles.get(resourceName).equals(resource)) { - throw new IllegalStateException("There is conflicting local resource (" + resourceName - + ") between dag local resource and vertex " + v.getName() + " local resource. " - + "\nResource of dag : " + commonTaskLocalFiles.get(resourceName) - + "\nResource of vertex: " + resource); + && !commonTaskLocalFiles.get(resourceName).equals(resource)) { + // Different for some reason. Compare size, and then eventually hash + try { + + LocalResource commonLr = commonTaskLocalFiles.get(resourceName); + if (resource.getSize() != commonLr.getSize()) { + throw new IllegalStateException( + "There is conflicting local resource (size mismatch) (" + + resourceName + + ") between dag local resource and vertex " + + v.getName() + " local resource. " + + "\nResource of dag : " + + commonTaskLocalFiles.get(resourceName) + + "\nResource of vertex: " + resource); + } + + Path vertexResourcePath = + ConverterUtils.getPathFromYarnURL(resource.getResource()); + Path commonResourcePath = ConverterUtils.getPathFromYarnURL( + commonLr.getResource()); + + byte[] vertexResourceSha = TezClientUtils + .getResourceSha(vertexResourcePath.toUri(), tezConf); + byte[] commonResourceSha = TezClientUtils + .getResourceSha(commonResourcePath.toUri(), tezConf); + + if (!Arrays.equals(vertexResourceSha, commonResourceSha)) { + throw new IllegalStateException( + "There is conflicting local resource (sha mismatch) (" + + resourceName + + ") between dag local resource and vertex " + + v.getName() + " local resource. " + + "\nResource of dag : " + + commonTaskLocalFiles.get(resourceName) + + "\nResource of vertex: " + resource); + } + + } catch (URISyntaxException | IOException e) { + throw new RuntimeException( + "Failed while attempting to validate sha for conflicting resources (" + + resourceName + + ") between dag local resource and vertex " + v.getName() + + " local resource. " + + "\nResource of dag : " + + commonTaskLocalFiles.get(resourceName) + + "\nResource of vertex: " + resource); + } } } } - - return topologicalVertexStack; } // Adaptation of Tarjan's algorithm for connected components. @@ -794,6 +847,7 @@ public class DAG { boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) { Deque<String> topologicalVertexStack = verify(true); + verifyLocalResources(tezConf); DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); dagBuilder.setName(this.name); http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index 5706542..e3c40aa 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -1167,6 +1167,7 @@ public class TestDAGVerify { dag.verify(); } + // Verifies failure in case of a file size difference. Does not verify sha differences. @Test(timeout = 5000) public void testDAGWithConflictingResource() { DAG dag = DAG.create("dag"); @@ -1185,7 +1186,7 @@ public class TestDAGVerify { dag.addVertex(v1); try { - dag.verify(); + dag.verifyLocalResources(new TezConfiguration()); Assert.fail("should report failure on conflict resources"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("There is conflicting local resource")); http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java index de0b94c..84a9474 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java +++ b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java @@ -19,7 +19,6 @@ package org.apache.tez.dag.utils; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.net.URL; import java.util.Collections; @@ -27,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -68,28 +66,4 @@ public class RelocalizationUtils { fs.copyToLocalFile(srcPath, dFile); return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd); } - - public static byte[] getLocalSha(Path path, Configuration conf) throws IOException { - InputStream is = null; - try { - is = FileSystem.getLocal(conf).open(path); - return DigestUtils.sha256(is); - } finally { - if (is != null) { - is.close(); - } - } - } - - public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException { - InputStream is = null; - try { - is = FileSystem.get(uri, conf).open(new Path(uri)); - return DigestUtils.sha256(is); - } finally { - if (is != null) { - is.close(); - } - } - } } http://git-wip-us.apache.org/repos/asf/tez/blob/ebf3fb13/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 76d9fdb..1995b16 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -67,6 +67,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.client.CallerContext; +import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; @@ -1454,7 +1455,7 @@ public class DAGAppMaster extends AbstractService { // The existing file must already be in usercache... let's try to find it. Path localFile = findLocalFileForResource(fileName); if (localFile != null) { - oldSha = RelocalizationUtils.getLocalSha(localFile, conf); + oldSha = TezClientUtils.getLocalSha(localFile, conf); } else { LOG.warn("Couldn't find local file for " + oldLr); } @@ -1462,11 +1463,11 @@ public class DAGAppMaster extends AbstractService { LOG.warn("Error getting SHA from local file for " + oldLr, ex); } if (oldSha == null) { // Well, no dice. - oldSha = RelocalizationUtils.getResourceSha(getLocalResourceUri(oldLr), conf); + oldSha = TezClientUtils.getResourceSha(getLocalResourceUri(oldLr), conf); } // Get the new SHA directly from Hadoop stream. If it matches, we already have the // file, and if it doesn't we are going to fail; no need to download either way. - byte[] newSha = RelocalizationUtils.getResourceSha(getLocalResourceUri(newLr), conf); + byte[] newSha = TezClientUtils.getResourceSha(getLocalResourceUri(newLr), conf); return Arrays.equals(oldSha, newSha); } });
