This is an automated email from the ASF dual-hosted git repository.
mkwhit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new ef0c7e8 CRUNCH-685 Use whitelist and blacklist for .fileSystem()
properties (#25)
ef0c7e8 is described below
commit ef0c7e882921c2c99fcd0feb9a50d8438c327822
Author: Ben Roling <[email protected]>
AuthorDate: Fri Jul 12 16:36:57 2019 -0500
CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties (#25)
* CRUNCH-685 Use whitelist and blacklist for .fileSystem() properties
* CRUNCH-685 fix noisy logging
* CRUNCH-686 Fix FormatBundle to hide redacted properties
---
.../org/apache/crunch/ExternalFilesystemIT.java | 2 +-
.../java/org/apache/crunch/io/FormatBundle.java | 166 +++++++++++++++++++--
.../org/apache/crunch/io/impl/FileSourceImpl.java | 13 +-
.../org/apache/crunch/io/impl/FileTargetImpl.java | 12 +-
.../org/apache/crunch/io/FormatBundleTest.java | 138 +++++++++++++++++
5 files changed, 298 insertions(+), 33 deletions(-)
diff --git
a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
index 75a2837..0ca396c 100644
--- a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -186,7 +186,7 @@ public class ExternalFilesystemIT {
}
private static Configuration getDfsConf(String nsName, MiniDFSCluster
cluster) {
- Configuration conf = new Configuration();
+ Configuration conf = new Configuration(false);
conf.set("dfs.nameservices", nsName);
conf.set("dfs.client.failover.proxy.provider." + nsName,
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 0b50080..1a89f8d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -25,25 +27,31 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.google.common.collect.Maps;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A combination of an {@link InputFormat} or {@link OutputFormat} and any
extra
@@ -55,9 +63,30 @@ import com.google.common.collect.Maps;
*/
public class FormatBundle<K> implements Serializable, Writable, Configurable {
+ private final Logger LOG = LoggerFactory.getLogger(FormatBundle.class);
+ /**
+ * A comma-separated list of properties whose value will be redacted.
+ * MR config to redact job conf properties:
https://issues.apache.org/jira/browse/MAPREDUCE-6741
+ */
+ private static final String MR_JOB_REDACTED_PROPERTIES =
"mapreduce.job.redacted-properties";
+ private static final String REDACTION_REPLACEMENT_VAL =
"*********(redacted)";
+
+ private final String FILESYSTEM_BLACKLIST_PATTERNS_KEY =
"crunch.fs.props.blacklist.patterns";
+ private final String[] FILESYSTEM_BLACKLIST_PATTERNS_DEFAULT =
+ new String[] {
+ "^fs\\.defaultFS$",
+ "^fs\\.default\\.name$"};
+
+ private final String FILESYSTEM_WHITELIST_PATTERNS_KEY =
"crunch.fs.props.whitelist.patterns";
+ private final String[] FILESYSTEM_WHITELIST_PATTERNS_DEFAULT =
+ new String[] {
+ "^fs\\..*",
+ "^dfs\\..*"};
+
private Class<K> formatClass;
private Map<String, String> extraConf;
private Configuration conf;
+ private FileSystem fileSystem;
public static <T> FormatBundle<T> fromSerialized(String serialized,
Configuration conf) {
ByteArrayInputStream bais = new
ByteArrayInputStream(Base64.decodeBase64(serialized));
@@ -82,8 +111,9 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
public FormatBundle() {
// For Writable support
}
-
- private FormatBundle(Class<K> formatClass) {
+
+ @VisibleForTesting
+ FormatBundle(Class<K> formatClass) {
this.formatClass = formatClass;
this.extraConf = Maps.newHashMap();
}
@@ -93,26 +123,121 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
return this;
}
+ public FormatBundle<K> setFileSystem(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
public Class<K> getFormatClass() {
return formatClass;
}
public Configuration configure(Configuration conf) {
+ // first configure fileystem properties
+ Map<String, String> appliedFsProperties = configureFileSystem(conf);
+
+ // then apply extraConf properties
for (Map.Entry<String, String> e : extraConf.entrySet()) {
String key = e.getKey();
String value = e.getValue();
- // merge the value if it is DFS_NAMESERVICES to support additional
filesystems
+ conf.set(key, value);
+ if (appliedFsProperties.get(key) != null) {
+ LOG.info("{}={} from extraConf overrode {}={} from filesystem conf",
+ new Object[] {key, value, key, appliedFsProperties.get(key)});
+ }
+ }
+ return conf;
+ }
+
+ private Map<String,String> configureFileSystem(Configuration conf) {
+ if (fileSystem == null) {
+ return Collections.emptyMap();
+ }
+
+ Collection<Pattern> blacklistPatterns =
+ compilePatterns(
+ conf.getStrings(FILESYSTEM_BLACKLIST_PATTERNS_KEY,
+ FILESYSTEM_BLACKLIST_PATTERNS_DEFAULT));
+ Collection<Pattern> whitelistPatterns =
+ compilePatterns(
+ conf.getStrings(FILESYSTEM_WHITELIST_PATTERNS_KEY,
+ FILESYSTEM_WHITELIST_PATTERNS_DEFAULT));
+
+ Configuration fileSystemConf = fileSystem.getConf();
+ Map<String, String> appliedProperties = new HashMap<>();
+ Collection<String> redactedProperties =
conf.getTrimmedStringCollection(MR_JOB_REDACTED_PROPERTIES);
+
+ for (Entry<String, String> e : fileSystemConf) {
+ String key = e.getKey();
+ String value = fileSystemConf.get(key);
+ String originalValue = conf.get(key);
+
+ if (value.equals(originalValue)) {
+ continue;
+ }
+
+ Pattern matchingBlacklistPattern = matchingPattern(key,
blacklistPatterns);
+ if (matchingBlacklistPattern != null) {
+ LOG.info("{}={} matches blacklist pattern '{}', omitted",
+ new Object[] {key, value, matchingBlacklistPattern});
+ continue;
+ }
+ Pattern matchingWhitelistPattern = matchingPattern(key,
whitelistPatterns);
+ if (matchingWhitelistPattern == null) {
+ LOG.info("{}={} matches no whitelist pattern from {}, omitted",
+ new Object[] {key, value, whitelistPatterns});
+ continue;
+ }
+
if (key.equals(DFSConfigKeys.DFS_NAMESERVICES)) {
- String[] originalValue = conf.getStrings(key);
+ String[] originalArrayValue = conf.getStrings(key);
if (originalValue != null) {
String[] newValue = value != null ? value.split(",") : new String[0];
- conf.setStrings(key, mergeValues(originalValue, newValue));
+ String[] merged = mergeValues(originalArrayValue, newValue);
+ LOG.info("Merged '{}' into '{}' with result '{}'",
+ new Object[] {newValue, DFSConfigKeys.DFS_NAMESERVICES, merged});
+ conf.setStrings(key, merged);
+ appliedProperties.put(key, StringUtils.arrayToString(merged));
continue;
}
}
+
+ String message = "Applied {}={} from FS '{}'";
+ if (originalValue != null) {
+ message += ", overriding '{}'";
+ }
+ if (redactedProperties.contains(key)) {
+ LOG.info(message,
+ new Object[]{key, REDACTION_REPLACEMENT_VAL, fileSystem.getUri(),
REDACTION_REPLACEMENT_VAL});
+ } else {
+ LOG.info(message,
+ new Object[]{key, value, fileSystem.getUri(), originalValue});
+ }
conf.set(key, value);
+ appliedProperties.put(key, value);
}
- return conf;
+ return appliedProperties;
+ }
+
+ private static Pattern matchingPattern(String s, Collection<Pattern>
patterns) {
+ for (Pattern pattern : patterns) {
+ if (pattern.matcher(s).find()) {
+ return pattern;
+ }
+ }
+ return null;
+ }
+
+ private static Collection<Pattern> compilePatterns(String[] patterns) {
+ Collection<Pattern> compiledPatterns = new ArrayList<>(patterns.length);
+ for (String pattern : patterns) {
+ compiledPatterns.add(Pattern.compile(pattern));
+ }
+ return compiledPatterns;
}
private static String[] mergeValues(String[] value1, String[] value2) {
@@ -139,7 +264,9 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
@Override
public int hashCode() {
- return new
HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
+ return new HashCodeBuilder().append(formatClass)
+ .append(fileSystem)
+ .append(extraConf).toHashCode();
}
@Override
@@ -148,7 +275,9 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
return false;
}
FormatBundle<K> oib = (FormatBundle<K>) other;
- return Objects.equals(formatClass, oib.formatClass) &&
Objects.equals(extraConf, oib.extraConf);
+ return Objects.equals(formatClass, oib.formatClass)
+ && Objects.equals(fileSystem, oib.fileSystem)
+ && Objects.equals(extraConf, oib.extraConf);
}
@Override
@@ -161,6 +290,12 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
String value = Text.readString(in);
extraConf.put(key, value);
}
+ if (in.readBoolean()) {
+ String fileSystemUri = Text.readString(in);
+ Configuration filesystemConf = new Configuration(false);
+ filesystemConf.readFields(in);
+ this.fileSystem = FileSystem.get(URI.create(fileSystemUri),
filesystemConf);
+ }
}
@Override
@@ -171,6 +306,11 @@ public class FormatBundle<K> implements Serializable,
Writable, Configurable {
Text.writeString(out, e.getKey());
Text.writeString(out, e.getValue());
}
+ out.writeBoolean(fileSystem != null);
+ if (fileSystem != null) {
+ Text.writeString(out, fileSystem.getUri().toString());
+ fileSystem.getConf().write(out);
+ }
}
private Class readClass(DataInput in) throws IOException {
diff --git
a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 98c0fb8..7788da5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -55,7 +55,6 @@ public class FileSourceImpl<T> implements ReadableSource<T> {
protected List<Path> paths;
protected final PType<T> ptype;
protected final FormatBundle<? extends InputFormat> inputBundle;
- private FileSystem fileSystem;
public FileSourceImpl(Path path, PType<T> ptype, Class<? extends
InputFormat> inputFormatClass) {
this(path, ptype, FormatBundle.forInput(inputFormatClass));
@@ -91,7 +90,7 @@ public class FileSourceImpl<T> implements ReadableSource<T> {
@Override
public FileSystem getFileSystem() {
- return fileSystem;
+ return inputBundle.getFileSystem();
}
@Override
@@ -102,23 +101,17 @@ public class FileSourceImpl<T> implements
ReadableSource<T> {
@Override
public Source<T> fileSystem(FileSystem fileSystem) {
- if (this.fileSystem != null) {
+ if (inputBundle.getFileSystem() != null) {
throw new IllegalStateException("Filesystem already set. Change is not
supported.");
}
- this.fileSystem = fileSystem;
-
if (fileSystem != null) {
List<Path> qualifiedPaths = new ArrayList<>(paths.size());
for (Path path : paths) {
qualifiedPaths.add(fileSystem.makeQualified(path));
}
paths = qualifiedPaths;
-
- Configuration fsConf = fileSystem.getConf();
- for (Entry<String, String> entry : fsConf) {
- inputBundle.set(entry.getKey(), entry.getValue());
- }
+ inputBundle.setFileSystem(fileSystem);
}
return this;
}
diff --git
a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index d48ac31..fc3d2a8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -69,7 +69,6 @@ public class FileTargetImpl implements PathTarget {
protected Path path;
private final FormatBundle<? extends FileOutputFormat> formatBundle;
private final FileNamingScheme fileNamingScheme;
- private FileSystem fileSystem;
public FileTargetImpl(Path path, Class<? extends FileOutputFormat>
outputFormatClass,
FileNamingScheme fileNamingScheme) {
@@ -96,26 +95,21 @@ public class FileTargetImpl implements PathTarget {
@Override
public Target fileSystem(FileSystem fileSystem) {
- if (this.fileSystem != null) {
+ if (formatBundle.getFileSystem() != null) {
throw new IllegalStateException("Filesystem already set. Change is not
supported.");
}
if (fileSystem != null) {
path = fileSystem.makeQualified(path);
- this.fileSystem = fileSystem;
-
- Configuration fsConf = fileSystem.getConf();
- for (Entry<String, String> entry : fsConf) {
- formatBundle.set(entry.getKey(), entry.getValue());
- }
+ formatBundle.setFileSystem(fileSystem);
}
return this;
}
@Override
public FileSystem getFileSystem() {
- return fileSystem;
+ return formatBundle.getFileSystem();
}
@Override
diff --git
a/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java
b/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java
new file mode 100644
index 0000000..7949a52
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/FormatBundleTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FormatBundleTest {
+ @Test
+ public void testFileSystemConfs() throws Exception {
+ Configuration fsConf = new Configuration(false);
+ fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///tmp/foo");
+ fsConf.set("foo", "bar");
+ fsConf.set("fs.fake.impl", "FakeFileSystem");
+ fsConf.set("dfs.overridden", "fsValue");
+ fsConf.set("dfs.extraOverridden", "fsExtra");
+ fsConf.set(DFSConfigKeys.DFS_NAMESERVICES, "fs-cluster");
+
+ FileSystem fs = FileSystem.newInstance(fsConf);
+
+ FormatBundle<TextInputFormat> formatBundle = new
FormatBundle<>(TextInputFormat.class);
+ formatBundle.setFileSystem(fs);
+ formatBundle.set("dfs.extraOverridden", "extraExtra");
+
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_NAMESERVICES, "pipeline-cluster");
+ conf.set("dfs.overridden", "pipelineValue");
+ formatBundle.configure(conf);
+
+ // should be filtered by blacklist
+
Assert.assertFalse(conf.get(FileSystem.FS_DEFAULT_NAME_KEY).equals("hdfs://my-hdfs"));
+
+ // shouldn't be on whitelist
+ Assert.assertFalse(conf.get("foo") != null);
+
+ // should get through both blacklist and whitelist
+ Assert.assertEquals("FakeFileSystem", conf.get("fs.fake.impl"));
+
+ // should use value from fsConf
+ Assert.assertEquals("fsValue", conf.get("dfs.overridden"));
+
+ // should use value from 'extraConf'
+ Assert.assertEquals("extraExtra", conf.get("dfs.extraOverridden"));
+
+ // dfs.nameservices should be merged
+ Assert.assertArrayEquals(new String [] {"pipeline-cluster", "fs-cluster"},
+ conf.getStrings(DFSConfigKeys.DFS_NAMESERVICES));
+ }
+ @Test
+ public void testRedactedFileSystemConfs() throws Exception {
+ Configuration fsConf = new Configuration(false);
+ fsConf.set("fs.s3a.access.key", "accessKey");
+ fsConf.set("fs.s3a.secret.key", "secretKey");
+ fsConf.set("fs.fake.impl", "FakeFileSystem");
+ FileSystem fs = FileSystem.newInstance(fsConf);
+
+ FormatBundle<TextInputFormat> formatBundle = new
FormatBundle<>(TextInputFormat.class);
+ formatBundle.setFileSystem(fs);
+
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.job.redacted-properties",
"fs.s3a.access.key,fs.s3a.secret.key");
+
+ final FormatBundleTestAppender appender = new FormatBundleTestAppender();
+ final Logger logger = Logger.getRootLogger();
+ logger.addAppender(appender);
+ try {
+ Logger.getLogger(FormatBundleTest.class);
+ formatBundle.configure(conf);
+ } finally {
+ logger.removeAppender(appender);
+ }
+
+ final List<LoggingEvent> log = appender.getLog();
+
+ // redacted value: accesskey
+ Assert.assertThat(log.get(0).getMessage().toString(),
+ is("Applied fs.s3a.access.key=*********(redacted) from FS
'file:///'"));
+
+ // fake non redacted value: fs.fake.impl
+ Assert.assertThat(log.get(1).getMessage().toString(),
+ is("Applied fs.fake.impl=FakeFileSystem from FS 'file:///'"));
+
+ // redacted value: secretKey
+ Assert.assertThat(log.get(2).getMessage().toString(),
+ is("Applied fs.s3a.secret.key=*********(redacted) from FS
'file:///'"));
+ }
+
+
+ class FormatBundleTestAppender extends AppenderSkeleton {
+
+ private final List<LoggingEvent> log = new ArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<>(log);
+ }
+ }
+
+}
\ No newline at end of file