Repository: falcon Updated Branches: refs/heads/master 292dfed34 -> 73339264d
FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode Author: peeyush b <[email protected]> Reviewers: "Praveen Adlakha <[email protected]>, Balu Vellanki <[email protected]>, Venkat Ranganathan <[email protected]>" Closes #220 from peeyushb/FALCON-2071 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/73339264 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/73339264 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/73339264 Branch: refs/heads/master Commit: 73339264d6d8cd627aee0be53007a7bd4e4956ac Parents: 292dfed Author: peeyush b <[email protected]> Authored: Tue Jul 19 13:21:51 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue Jul 19 13:21:51 2016 -0700 ---------------------------------------------------------------------- .../process/SparkProcessWorkflowBuilder.java | 34 +++++++++----------- .../OozieProcessWorkflowBuilderTest.java | 6 ++-- 2 files changed, 20 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java index 5f4fafa..51db75d 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java @@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.spark.CONFIGURATION.Property; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.util.OozieUtils; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import javax.xml.bind.JAXBElement; @@ -59,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue(); String sparkMasterURL = entity.getSparkAttributes().getMaster(); - String sparkFilePath = entity.getSparkAttributes().getJar(); + Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar()); String sparkJobName = entity.getSparkAttributes().getName(); String sparkOpts = entity.getSparkAttributes().getSparkOpts(); String sparkClassName = entity.getSparkAttributes().getClazz(); @@ -94,18 +92,28 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder addOutputFeedsAsArgument(argList, cluster); addInputFeedsAsArgument(argList, cluster); - sparkAction.setJar(addUri(sparkFilePath, cluster)); - - setSparkLibFileToWorkflowLib(sparkFilePath, entity); + // In Oozie spark action, value for jar is either Java jar file path or Python file path. + validateSparkJarFilePath(sparkJarFilePath); + sparkAction.setJar(sparkJarFilePath.getName()); + setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity); propagateEntityProperties(sparkAction); OozieUtils.marshalSparkAction(action, actionJaxbElement); return action; } - private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) { + private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) { if (StringUtils.isEmpty(entity.getWorkflow().getLib())) { - entity.getWorkflow().setLib(sparkFile); + entity.getWorkflow().setLib(sparkJarFilePath); + } else { + String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath; + entity.getWorkflow().setLib(workflowLib); + } + } + + private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException { + if (!sparkJarFilePath.isAbsolute()) { + throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath); } } @@ -188,16 +196,6 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder } } - private String addUri(String jarFile, Cluster cluster) throws FalconException { - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); - Path jarFilePath = new Path(jarFile); - if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) { - return fs.makeQualified(jarFilePath).toString(); - } - return jarFile; - } - private String getClusterEntitySparkMaster(Cluster cluster) { return ClusterHelper.getSparkMasterEndPoint(cluster); } http://git-wip-us.apache.org/repos/asf/falcon/blob/73339264/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 30ff537..a692d0c 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -372,6 +372,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); testParentWorkflow(process, parentWorkflow); + assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/falcon-examples.jar"); ACTION sparkNode = getAction(parentWorkflow, "user-action"); @@ -380,7 +381,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue(); assertEquals(sparkAction.getMaster(), "local"); - assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar"); + assertEquals(sparkAction.getJar(), "falcon-examples.jar"); Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process)); List<String> argsList = sparkAction.getArg(); @@ -430,6 +431,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); testParentWorkflow(process, parentWorkflow); + assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/spark-wordcount.jar"); ACTION sparkNode = getAction(parentWorkflow, "user-action"); @@ -437,7 +439,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { OozieUtils.unMarshalSparkAction(sparkNode); org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue(); assertEquals(sparkAction.getMaster(), "local"); - assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/spark-wordcount.jar"); + assertEquals(sparkAction.getJar(), "spark-wordcount.jar"); List<String> argsList = sparkAction.getArg(); Input input = process.getInputs().getInputs().get(0); Output output = process.getOutputs().getOutputs().get(0);
