This is an automated email from the ASF dual-hosted git repository.

asalamon74 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 095e1f6  OOZIE-3440 [action] Oozie Spark action replaces path symlink 
# to %23 (dionusos via asalamon74)
095e1f6 is described below

commit 095e1f68f72693101d27e9b69f5adb3d5d0aab37
Author: Andras Salamon <[email protected]>
AuthorDate: Thu Feb 28 14:08:30 2019 +0100

    OOZIE-3440 [action] Oozie Spark action replaces path symlink # to %23 
(dionusos via asalamon74)
---
 release-log.txt                                    |  1 +
 .../oozie/action/hadoop/SparkArgsExtractor.java    | 21 +++++-
 .../action/hadoop/TestSparkArgsExtractor.java      | 79 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git a/release-log.txt b/release-log.txt
index 7e42807..71d63fb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3440 [action] Oozie Spark action replaces path symlink # to %23 
(dionusos via asalamon74)
 OOZIE-3409 Oozie Server : Memory leak in EL evaluation (asalamon74 via kmarton)
 OOZIE-3441 Upgrade jackson version to 2.6.5 (nobigo via asalamon74)
 OOZIE-3326 [action] Sqoop Action should support tez delegation tokens for 
hive-import (bgoerlitz, dionusos via asalamon74)
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
index 28d9c5c..42f920b 100644
--- 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
+++ 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -318,7 +318,7 @@ class SparkArgsExtractor {
             jarFilter.filter();
             jarPath = jarFilter.getApplicationJar();
 
-            final String cachedFiles = StringUtils.join(fixedFileUris, 
OPT_VALUE_SEPARATOR);
+            final String cachedFiles = 
StringUtils.join(decodeUriPaths(fixedFileUris), OPT_VALUE_SEPARATOR);
             if (cachedFiles != null && !cachedFiles.isEmpty()) {
                 sparkArgs.add(FILES_OPTION);
                 sparkArgs.add(cachedFiles);
@@ -326,7 +326,7 @@ class SparkArgsExtractor {
             final Map<String, URI> fixedArchiveUrisMap = 
SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
                     getCacheArchives(actionConf));
             addUserDefined(userArchives.toString(), fixedArchiveUrisMap);
-            final String cachedArchives = 
StringUtils.join(fixedArchiveUrisMap.values(), OPT_VALUE_SEPARATOR);
+            final String cachedArchives = 
StringUtils.join(decodeUriPaths(fixedArchiveUrisMap.values()), 
OPT_VALUE_SEPARATOR);
             if (cachedArchives != null && !cachedArchives.isEmpty()) {
                 sparkArgs.add(ARCHIVES_OPTION);
                 sparkArgs.add(cachedArchives);
@@ -488,6 +488,23 @@ class SparkArgsExtractor {
         }
     }
 
+    @VisibleForTesting
+    Collection<String> decodeUriPaths(final Collection<URI> uris) {
+        if (uris == null || uris.isEmpty()) {
+            return new ArrayList<>();
+        }
+        final Collection<String> result = new ArrayList<>(uris.size());
+        for (final URI uri : uris) {
+            final String uriString = uri.toString();
+            if (!uri.getPath().equals(uri.getRawPath())) {
+                result.add(uriString.replace(uri.getRawPath(), uri.getPath()));
+            } else {
+                result.add(uriString);
+            }
+        }
+        return result;
+    }
+
     /*
      * Get properties that needs to be passed to Spark as Spark configuration 
from actionConf.
      */
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
index 7ccd26a..474ecbf 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.action.hadoop;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.junit.After;
 import org.junit.Test;
 
@@ -27,14 +28,18 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
 import static 
org.apache.oozie.action.hadoop.SparkArgsExtractor.SPARK_DEFAULTS_GENERATED_PROPERTIES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -393,6 +398,80 @@ public class TestSparkArgsExtractor {
                 sparkArgs);
     }
 
+    private static final String LOCAL_FILE = 
"/tmp/local_file.txt#local_file.txt";
+    private static final String HDFS_FILE = "hdfs:///hadoop/file.pq#file.pq";
+
+    @Test
+    public void testHashMarkInFilesPath() throws 
OozieActionConfiguratorException, IOException, URISyntaxException {
+        doTestingHashMarkInPath("--files");
+    }
+
+    @Test
+    public void testHashMarkInArchivesPath() throws 
OozieActionConfiguratorException, IOException, URISyntaxException {
+        doTestingHashMarkInPath("--archives");
+    }
+
+    private void doTestingHashMarkInPath(final String option)
+            throws OozieActionConfiguratorException, IOException, 
URISyntaxException {
+        Configuration actionConf = createSparkActionConfWithCustomSparkOpts(
+                String.format("%s %s,%s", option, LOCAL_FILE, HDFS_FILE));
+        List<String> sparkArgs = new 
SparkArgsExtractor(actionConf).extract(new String[0]);
+        assertForFilePaths(sparkArgs, LOCAL_FILE, option);
+        assertForFilePaths(sparkArgs, HDFS_FILE, option);
+
+        actionConf = createSparkActionConfWithCustomSparkOpts(
+                String.format("%s=%s,%s", option, LOCAL_FILE, HDFS_FILE));
+        sparkArgs = new SparkArgsExtractor(actionConf).extract(new String[0]);
+        assertForFilePaths(sparkArgs, LOCAL_FILE, option);
+        assertForFilePaths(sparkArgs, HDFS_FILE, option);
+    }
+
+    @Test
+    public void testIfUrisAreDecoded() {
+        final SparkArgsExtractor extractor = new SparkArgsExtractor(new 
Configuration());
+        final Collection<String> result = 
extractor.decodeUriPaths(Arrays.asList(
+                new Path(LOCAL_FILE).toUri(),
+                new Path(HDFS_FILE).toUri()
+        ));
+
+        assertTrue(result + " shall contain " + LOCAL_FILE, 
result.contains(LOCAL_FILE));
+        assertTrue(result + " shall contain " + HDFS_FILE, 
result.contains(HDFS_FILE));
+    }
+
+    @Test
+    public void testDecodeUriPathsNullInput() {
+        assertTrue("In case providing empty or null input, empty list shall be 
returned",
+                new SparkArgsExtractor(new 
Configuration()).decodeUriPaths(null).isEmpty());
+    }
+
+    private void assertForFilePaths(final List<String> collection, final 
String path, final String option) {
+        final String positive = path;
+        final String negative = path.replace("#", "%23");
+
+        final Iterator<String> iterator = collection.iterator();
+        while (iterator.hasNext()) {
+            String elem = iterator.next();
+            if (elem != null && elem.equals(option)) {
+                elem = iterator.next();
+                assertTrue(positive + " shall be present in " + collection, 
elem.contains(positive));
+                assertFalse(negative + " shall not be present in " + 
collection, elem.contains(negative));
+                return;
+            }
+        }
+        fail(String.format("Neither %s nor %s present in %s.", positive, 
negative, collection));
+    }
+
+    private Configuration createSparkActionConfWithCustomSparkOpts(final 
String sparkOpts) {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, 
"org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        actionConf.set(SparkActionExecutor.SPARK_OPTS, sparkOpts);
+        return actionConf;
+    }
+
     private void assertContainsSublist(final List<String> expected, final 
List<String> actual) {
         final int sublistSize = expected.size();
         assertTrue("actual size is below expected size", actual.size() >= 
sublistSize);

Reply via email to