Repository: nifi
Updated Branches:
refs/heads/master c7f770422 -> 58aedd785
NIFI-866: Add Kerberos Support for Hadoop
- Add krb5.conf to nifi.properties
nifi.kerberos.krb5.file | path to krb5.conf
- Connections to secure Hadoop clusters will be determined by their config,
that is, hadoop.security.authentication should be set to kerberos.
- Added two optional arguments to AbstractHadoopProcessor (principal and
keytab),
these are only required if the cluster you're connecting to is secured. Both
of
these options require the krb5.conf to be present in nifi.properties.
Signed-off-by: Bryan Bende <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fb6e884
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fb6e884
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fb6e884
Branch: refs/heads/master
Commit: 7fb6e884a72acfd32c97b96058909c0cf6434061
Parents: c7f7704
Author: ricky <[email protected]>
Authored: Thu Aug 13 14:54:15 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Mon Aug 24 15:36:22 2015 -0400
----------------------------------------------------------------------
nifi-assembly/pom.xml | 3 +
.../org/apache/nifi/util/NiFiProperties.java | 11 +
.../src/main/resources/conf/nifi.properties | 3 +
.../nifi-hdfs-processors/pom.xml | 4 +
.../hadoop/AbstractHadoopProcessor.java | 217 +++++++++++++------
.../processors/hadoop/AbstractHadoopTest.java | 46 +++-
6 files changed, 206 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 0431a76..3b42802 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -370,6 +370,9 @@ language governing permissions and limitations under the
License. -->
<nifi.cluster.manager.flow.retrieval.delay>5
sec</nifi.cluster.manager.flow.retrieval.delay>
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
<nifi.cluster.manager.safemode.duration>0
sec</nifi.cluster.manager.safemode.duration>
+
+ <!-- nifi.properties: kerberos properties -->
+ <nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
</properties>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 61199c4..5ee6c13 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -180,6 +180,9 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_MANAGER_PROTOCOL_THREADS =
"nifi.cluster.manager.protocol.threads";
public static final String CLUSTER_MANAGER_SAFEMODE_DURATION =
"nifi.cluster.manager.safemode.duration";
+ // kerberos properties
+ public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
+
// defaults
public static final String DEFAULT_TITLE = "NiFi";
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
@@ -799,6 +802,14 @@ public class NiFiProperties extends Properties {
}
}
+ public File getKerberosConfigurationFile() {
+ if (getProperty(KERBEROS_KRB5_FILE).trim().length() > 0) {
+ return new File(getProperty(KERBEROS_KRB5_FILE));
+ } else {
+ return null;
+ }
+ }
+
public InetSocketAddress getNodeApiAddress() {
final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 6eae46b..54b5283 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -162,3 +162,6 @@
nifi.cluster.manager.node.api.request.threads=${nifi.cluster.manager.node.api.re
nifi.cluster.manager.flow.retrieval.delay=${nifi.cluster.manager.flow.retrieval.delay}
nifi.cluster.manager.protocol.threads=${nifi.cluster.manager.protocol.threads}
nifi.cluster.manager.safemode.duration=${nifi.cluster.manager.safemode.duration}
+
+# kerberos #
+nifi.kerberos.krb5.file=${nifi.kerberos.krb5.file}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 6bc110f..04a1534 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -61,5 +61,9 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-properties</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 548d34c..8519d2c 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -28,16 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.util.Tuple;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -50,36 +41,89 @@ import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.Tuple;
/**
* This is a base class that is helpful when building processors interacting
with HDFS.
*/
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+ private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator()
{
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ // Check that both the principal & keytab are set before checking
the kerberos config
+ if (context.getProperty(KERBEROS_KEYTAB).getValue() == null ||
context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) {
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both
keytab and principal must be set in order to use Kerberos
authentication").build();
+ }
+
+ // Check that the Kerberos configuration is set
+ if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) {
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(false)
+ .explanation("you are missing the
nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in
order to use Kerberos").build();
+ }
+
+ // Check that the Kerberos configuration is readable
+ if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) {
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(false)
+ .explanation(String.format("unable to read Kerberos
config [%s], please make sure the path is valid " + "and nifi has adequate
permissions",
+
NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile()))
+ .build();
+ }
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+ };
// properties
- public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder()
- .name("Hadoop Configuration Resources")
+ public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder().name("Hadoop Configuration Resources")
.description("A file or comma separated list of files which
contains the Hadoop file system configuration. Without this, Hadoop "
+ "will search the classpath for a 'core-site.xml' and
'hdfs-site.xml' file or will revert to a default configuration.")
- .required(false)
- .addValidator(createMultipleFilesExistValidator())
- .build();
+
.required(false).addValidator(createMultipleFilesExistValidator()).build();
+
+ public static NiFiProperties NIFI_PROPERTIES = null;
public static final String DIRECTORY_PROP_NAME = "Directory";
- public static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
- .name("Compression codec")
- .required(false)
- .allowableValues(BZip2Codec.class.getName(),
DefaultCodec.class.getName(),
- GzipCodec.class.getName(), Lz4Codec.class.getName(),
SnappyCodec.class.getName())
- .build();
+ public static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder().name("Compression codec").required(false)
+ .allowableValues(BZip2Codec.class.getName(),
DefaultCodec.class.getName(), GzipCodec.class.getName(),
Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
+
+ public static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
+ .description("Kerberos principal to authenticate as. Requires
nifi.kerberos.krb5.file to be set " + "in your
nifi.properties").addValidator(Validator.VALID)
+ .addValidator(KERBEROS_CONFIG_VALIDATOR).build();
+
+ public static final PropertyDescriptor KERBEROS_KEYTAB = new
PropertyDescriptor.Builder().name("Kerberos Keytab").required(false)
+ .description("Kerberos keytab associated with the principal.
Requires nifi.kerberos.krb5.file to be set " + "in your
nifi.properties").addValidator(Validator.VALID)
+
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
protected static final List<PropertyDescriptor> properties;
+ private static final Object RESOURCES_LOCK = new Object();
+
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HADOOP_CONFIGURATION_RESOURCES);
+ props.add(KERBEROS_PRINCIPAL);
+ props.add(KERBEROS_KEYTAB);
properties = Collections.unmodifiableList(props);
+ try {
+ NIFI_PROPERTIES = NiFiProperties.getInstance();
+ } catch (Exception e) {
+ // This will happen during tests
+ NIFI_PROPERTIES = null;
+ }
+ if (NIFI_PROPERTIES != null &&
NIFI_PROPERTIES.getKerberosConfigurationFile() != null) {
+ System.setProperty("java.security.krb5.conf",
NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath());
+ }
}
// variables shared by all threads of this processor
@@ -97,8 +141,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
}
/*
- * If your subclass also has an @OnScheduled annotated method and you need
hdfsResources in that method, then be sure to
- * call super.abstractOnScheduled(context)
+ * If your subclass also has an @OnScheduled annotated method and you need
hdfsResources in that method, then be sure to call
super.abstractOnScheduled(context)
*/
@OnScheduled
public final void abstractOnScheduled(ProcessContext context) throws
IOException {
@@ -108,11 +151,11 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
String configResources =
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
String dir =
context.getProperty(DIRECTORY_PROP_NAME).getValue();
dir = dir == null ? "/" : dir;
- resources = resetHDFSResources(configResources, dir);
+ resources = resetHDFSResources(configResources, dir, context);
hdfsResources.set(resources);
}
} catch (IOException ex) {
- getLogger().error("HDFS Configuration error - {}", new
Object[]{ex});
+ getLogger().error("HDFS Configuration error - {}", new Object[] {
ex });
hdfsResources.set(new Tuple<Configuration, FileSystem>(null,
null));
throw ex;
}
@@ -123,10 +166,38 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
hdfsResources.set(new Tuple<Configuration, FileSystem>(null, null));
}
+ private static Configuration getConfigurationFromResources(String
configResources) throws IOException {
+ boolean foundResources = false;
+ final Configuration config = new Configuration();
+ if (null != configResources) {
+ String[] resources = configResources.split(",");
+ for (String resource : resources) {
+ config.addResource(new Path(resource.trim()));
+ foundResources = true;
+ }
+ }
+
+ if (!foundResources) {
+ // check that at least 1 non-default resource is available on the
classpath
+ String configStr = config.toString();
+ for (String resource : configStr.substring(configStr.indexOf(":")
+ 1).split(",")) {
+ if (!resource.contains("default") &&
config.getResource(resource.trim()) != null) {
+ foundResources = true;
+ break;
+ }
+ }
+ }
+
+ if (!foundResources) {
+ throw new IOException("Could not find any of the " +
HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
+ }
+ return config;
+ }
+
/*
* Reset Hadoop Configuration and FileSystem based on the supplied
configuration resources.
*/
- Tuple<Configuration, FileSystem> resetHDFSResources(String
configResources, String dir) throws IOException {
+ Tuple<Configuration, FileSystem> resetHDFSResources(String
configResources, String dir, ProcessContext context) throws IOException {
// org.apache.hadoop.conf.Configuration saves its current thread
context class loader to use for threads that it creates
// later to do I/O. We need this class loader to be the NarClassLoader
instead of the magical
// NarThreadContextClassLoader.
@@ -134,30 +205,7 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
- boolean foundResources = false;
- final Configuration config = new Configuration();
- if (null != configResources) {
- String[] resources = configResources.split(",");
- for (String resource : resources) {
- config.addResource(new Path(resource.trim()));
- foundResources = true;
- }
- }
-
- if (!foundResources) {
- // check that at least 1 non-default resource is available on
the classpath
- String configStr = config.toString();
- for (String resource :
configStr.substring(configStr.indexOf(":") + 1).split(",")) {
- if (!resource.contains("default") &&
config.getResource(resource.trim()) != null) {
- foundResources = true;
- break;
- }
- }
- }
-
- if (!foundResources) {
- throw new IOException("Could not find any of the " +
HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
- }
+ Configuration config =
getConfigurationFromResources(configResources);
// first check for timeout on HDFS connection, because FileSystem
has a hard coded 15 minute timeout
checkHdfsUriForTimeout(config);
@@ -165,13 +213,26 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
// disable caching of Configuration and FileSystem objects, else
we cannot reconfigure the processor without a complete
// restart
String disableCacheName =
String.format("fs.%s.impl.disable.cache",
FileSystem.getDefaultUri(config).getScheme());
- config.set(disableCacheName, "true");
- final FileSystem fs = getFileSystem(config);
- getLogger().info(
- "Initialized a new HDFS File System with working dir: {}
default block size: {} default replication: {} config: {}",
- new Object[]{fs.getWorkingDirectory(),
fs.getDefaultBlockSize(new Path(dir)),
- fs.getDefaultReplication(new Path(dir)),
config.toString()});
+ // If kerberos is enabled, create the file system as the kerberos
principal
+ // -- use RESOURCE_LOCK to guarantee UserGroupInformation is
accessed by only a single thread at at time
+ FileSystem fs = null;
+ synchronized (RESOURCES_LOCK) {
+ if
(config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
+ String principal =
context.getProperty(KERBEROS_PRINCIPAL).getValue();
+ String keyTab =
context.getProperty(KERBEROS_KEYTAB).getValue();
+ UserGroupInformation.setConfiguration(config);
+ UserGroupInformation ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
+ fs = getFileSystemAsUser(config, ugi);
+ } else {
+ config.set("ipc.client.fallback-to-simple-auth-allowed",
"true");
+ config.set("hadoop.security.authentication", "simple");
+ fs = getFileSystem(config);
+ }
+ }
+ config.set(disableCacheName, "true");
+ getLogger().info("Initialized a new HDFS File System with working
dir: {} default block size: {} default replication: {} config: {}",
+ new Object[] { fs.getWorkingDirectory(),
fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)),
config.toString() });
return new Tuple<>(config, fs);
} finally {
@@ -180,17 +241,31 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
}
/**
- * This exists in order to allow unit tests to override it so that they
don't take several minutes waiting
- * for UDP packets to be received
+ * This exists in order to allow unit tests to override it so that they
don't take several minutes waiting for UDP packets to be received
*
- * @param config the configuration to use
+ * @param config
+ * the configuration to use
* @return the FileSystem that is created for the given Configuration
- * @throws IOException if unable to create the FileSystem
+ * @throws IOException
+ * if unable to create the FileSystem
*/
protected FileSystem getFileSystem(final Configuration config) throws
IOException {
return FileSystem.get(config);
}
+ protected FileSystem getFileSystemAsUser(final Configuration config,
UserGroupInformation ugi) throws IOException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override
+ public FileSystem run() throws Exception {
+ return FileSystem.get(config);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException("Unable to create file system: " +
e.getMessage());
+ }
+ }
+
/*
* Drastically reduce the timeout of a socket connection from the default
in FileSystem.get()
*/
@@ -227,13 +302,11 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not
exist or is not a file";
- return new
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
- .build();
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename
+ " due to " + e.getMessage();
- return new
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message)
- .build();
+ return new
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
@@ -245,8 +318,10 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
- * @param context the ProcessContext
- * @param configuration the Hadoop Configuration
+ * @param context
+ * the ProcessContext
+ * @param configuration
+ * the Hadoop Configuration
* @return CompressionCodec or null
*/
protected CompressionCodec getCompressionCodec(ProcessContext context,
Configuration configuration) {
@@ -261,10 +336,12 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
}
/**
- * Returns the relative path of the child that does not include the
filename
- * or the root path.
- * @param root the path to relativize from
- * @param child the path to relativize
+ * Returns the relative path of the child that does not include the
filename or the root path.
+ *
+ * @param root
+ * the path to relativize from
+ * @param child
+ * the path to relativize
* @return the relative path
*/
public static String getPathDifference(final Path root, final Path child) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/7fb6e884/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index e1b7827..c47c7a0 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -16,24 +16,26 @@
*/
package org.apache.nifi.processors.hadoop;
-import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class AbstractHadoopTest {
private static Logger logger;
@@ -82,9 +84,37 @@ public class AbstractHadoopTest {
TestRunner runner =
TestRunners.newTestRunner(SimpleHadoopProcessor.class);
SimpleHadoopProcessor processor = (SimpleHadoopProcessor)
runner.getProcessor();
try {
-
processor.resetHDFSResources("src/test/resources/core-site-broken.xml",
"/target");
+
processor.resetHDFSResources("src/test/resources/core-site-broken.xml",
"/target", runner.getProcessContext());
Assert.fail("Should have thrown SocketTimeoutException");
} catch (IOException e) {
}
}
+
+ @Test
+ public void testKerberosOptions() throws Exception {
+ File temporaryFile = File.createTempFile("hadoop-test", ".properties");
+ try {
+ // mock properties and return a temporary file for the kerberos
configuration
+ NiFiProperties mockedProperties = mock(NiFiProperties.class);
+
when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile);
+ SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties;
+ TestRunner runner =
TestRunners.newTestRunner(SimpleHadoopProcessor.class);
+ // should be valid since no kerberos options specified
+ runner.assertValid();
+ // no longer valid since only the principal is provided
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL,
"principal");
+ runner.assertNotValid();
+ // invalid since the keytab does not exist
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB,
"BAD_KEYTAB_PATH");
+ runner.assertNotValid();
+ // valid since keytab is now a valid file location
+ runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB,
temporaryFile.getAbsolutePath());
+ runner.assertValid();
+ // invalid since the kerberos configuration was changed to a
non-existent file
+
when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new
File("BAD_KERBEROS_PATH"));
+ runner.assertNotValid();
+ } finally {
+ temporaryFile.delete();
+ }
+ }
}