This is an automated email from the ASF dual-hosted git repository.
asalamon74 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 095e1f6 OOZIE-3440 [action] Oozie Spark action replaces path symlink
# to %23 (dionusos via asalamon74)
095e1f6 is described below
commit 095e1f68f72693101d27e9b69f5adb3d5d0aab37
Author: Andras Salamon <[email protected]>
AuthorDate: Thu Feb 28 14:08:30 2019 +0100
OOZIE-3440 [action] Oozie Spark action replaces path symlink # to %23
(dionusos via asalamon74)
---
release-log.txt | 1 +
.../oozie/action/hadoop/SparkArgsExtractor.java | 21 +++++-
.../action/hadoop/TestSparkArgsExtractor.java | 79 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 2 deletions(-)
diff --git a/release-log.txt b/release-log.txt
index 7e42807..71d63fb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.2.0 release (trunk - unreleased)
+OOZIE-3440 [action] Oozie Spark action replaces path symlink # to %23
(dionusos via asalamon74)
OOZIE-3409 Oozie Server : Memory leak in EL evaluation (asalamon74 via kmarton)
OOZIE-3441 Upgrade jackson version to 2.6.5 (nobigo via asalamon74)
OOZIE-3326 [action] Sqoop Action should support tez delegation tokens for
hive-import (bgoerlitz, dionusos via asalamon74)
diff --git
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
index 28d9c5c..42f920b 100644
---
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
+++
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -318,7 +318,7 @@ class SparkArgsExtractor {
jarFilter.filter();
jarPath = jarFilter.getApplicationJar();
- final String cachedFiles = StringUtils.join(fixedFileUris,
OPT_VALUE_SEPARATOR);
+ final String cachedFiles =
StringUtils.join(decodeUriPaths(fixedFileUris), OPT_VALUE_SEPARATOR);
if (cachedFiles != null && !cachedFiles.isEmpty()) {
sparkArgs.add(FILES_OPTION);
sparkArgs.add(cachedFiles);
@@ -326,7 +326,7 @@ class SparkArgsExtractor {
final Map<String, URI> fixedArchiveUrisMap =
SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
getCacheArchives(actionConf));
addUserDefined(userArchives.toString(), fixedArchiveUrisMap);
- final String cachedArchives =
StringUtils.join(fixedArchiveUrisMap.values(), OPT_VALUE_SEPARATOR);
+ final String cachedArchives =
StringUtils.join(decodeUriPaths(fixedArchiveUrisMap.values()),
OPT_VALUE_SEPARATOR);
if (cachedArchives != null && !cachedArchives.isEmpty()) {
sparkArgs.add(ARCHIVES_OPTION);
sparkArgs.add(cachedArchives);
@@ -488,6 +488,23 @@ class SparkArgsExtractor {
}
}
+ @VisibleForTesting
+ Collection<String> decodeUriPaths(final Collection<URI> uris) {
+ if (uris == null || uris.isEmpty()) {
+ return new ArrayList<>();
+ }
+ final Collection<String> result = new ArrayList<>(uris.size());
+ for (final URI uri : uris) {
+ final String uriString = uri.toString();
+ if (!uri.getPath().equals(uri.getRawPath())) {
+ result.add(uriString.replace(uri.getRawPath(), uri.getPath()));
+ } else {
+ result.add(uriString);
+ }
+ }
+ return result;
+ }
+
/*
* Get properties that needs to be passed to Spark as Spark configuration
from actionConf.
*/
diff --git
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
index 7ccd26a..474ecbf 100644
---
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
+++
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.action.hadoop;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Test;
@@ -27,14 +28,18 @@ import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import static
org.apache.oozie.action.hadoop.SparkArgsExtractor.SPARK_DEFAULTS_GENERATED_PROPERTIES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -393,6 +398,80 @@ public class TestSparkArgsExtractor {
sparkArgs);
}
+ private static final String LOCAL_FILE =
"/tmp/local_file.txt#local_file.txt";
+ private static final String HDFS_FILE = "hdfs:///hadoop/file.pq#file.pq";
+
+ @Test
+ public void testHashMarkInFilesPath() throws
OozieActionConfiguratorException, IOException, URISyntaxException {
+ doTestingHashMarkInPath("--files");
+ }
+
+ @Test
+ public void testHashMarkInArchivesPath() throws
OozieActionConfiguratorException, IOException, URISyntaxException {
+ doTestingHashMarkInPath("--archives");
+ }
+
+ private void doTestingHashMarkInPath(final String option)
+ throws OozieActionConfiguratorException, IOException,
URISyntaxException {
+ Configuration actionConf = createSparkActionConfWithCustomSparkOpts(
+ String.format("%s %s,%s", option, LOCAL_FILE, HDFS_FILE));
+ List<String> sparkArgs = new
SparkArgsExtractor(actionConf).extract(new String[0]);
+ assertForFilePaths(sparkArgs, LOCAL_FILE, option);
+ assertForFilePaths(sparkArgs, HDFS_FILE, option);
+
+ actionConf = createSparkActionConfWithCustomSparkOpts(
+ String.format("%s=%s,%s", option, LOCAL_FILE, HDFS_FILE));
+ sparkArgs = new SparkArgsExtractor(actionConf).extract(new String[0]);
+ assertForFilePaths(sparkArgs, LOCAL_FILE, option);
+ assertForFilePaths(sparkArgs, HDFS_FILE, option);
+ }
+
+ @Test
+ public void testIfUrisAreDecoded() {
+ final SparkArgsExtractor extractor = new SparkArgsExtractor(new
Configuration());
+ final Collection<String> result =
extractor.decodeUriPaths(Arrays.asList(
+ new Path(LOCAL_FILE).toUri(),
+ new Path(HDFS_FILE).toUri()
+ ));
+
+ assertTrue(result + " shall contain " + LOCAL_FILE,
result.contains(LOCAL_FILE));
+ assertTrue(result + " shall contain " + HDFS_FILE,
result.contains(HDFS_FILE));
+ }
+
+ @Test
+ public void testDecodeUriPathsNullInput() {
+ assertTrue("In case providing empty or null input, empty list shall be
returned",
+ new SparkArgsExtractor(new
Configuration()).decodeUriPaths(null).isEmpty());
+ }
+
+ private void assertForFilePaths(final List<String> collection, final
String path, final String option) {
+ final String positive = path;
+ final String negative = path.replace("#", "%23");
+
+ final Iterator<String> iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ String elem = iterator.next();
+ if (elem != null && elem.equals(option)) {
+ elem = iterator.next();
+ assertTrue(positive + " shall be present in " + collection,
elem.contains(positive));
+ assertFalse(negative + " shall not be present in " +
collection, elem.contains(negative));
+ return;
+ }
+ }
+ fail(String.format("Neither %s nor %s present in %s.", positive,
negative, collection));
+ }
+
+ private Configuration createSparkActionConfWithCustomSparkOpts(final
String sparkOpts) {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS,
"org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ actionConf.set(SparkActionExecutor.SPARK_OPTS, sparkOpts);
+ return actionConf;
+ }
+
private void assertContainsSublist(final List<String> expected, final
List<String> actual) {
final int sublistSize = expected.size();
assertTrue("actual size is below expected size", actual.size() >=
sublistSize);