Repository: nifi Updated Branches: refs/heads/master 94cf1beeb -> 8773ec3d3
http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java index c47c7a0..76fc15d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java @@ -17,12 +17,15 @@ package org.apache.nifi.processors.hadoop; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -40,17 +43,42 @@ public class AbstractHadoopTest { private static Logger logger; + private File temporaryFile; + private KerberosProperties kerberosProperties; + private NiFiProperties mockedProperties; + @BeforeClass - public static void setUpClass() { + public static void setUpClass() throws IOException { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug"); logger = LoggerFactory.getLogger(AbstractHadoopTest.class); } + @Before + public void setup() throws IOException { + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + temporaryFile = File.createTempFile("hadoop-test", ".properties"); + + // mock properties and return a temporary file for the kerberos configuration + mockedProperties = mock(NiFiProperties.class); + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile); + kerberosProperties = KerberosProperties.create(mockedProperties); + } + + @After + public void cleanUp() { + temporaryFile.delete(); + } + @Test public void testErrorConditions() { - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); Collection<ValidationResult> results; ProcessContext pc; @@ -81,8 +109,8 @@ public class AbstractHadoopTest { @Test public void testTimeoutDetection() throws Exception { - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); - SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor(); + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); try { processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext()); Assert.fail("Should have thrown SocketTimeoutException"); @@ -92,29 +120,36 @@ public class AbstractHadoopTest { @Test public void testKerberosOptions() throws Exception { - File temporaryFile = File.createTempFile("hadoop-test", ".properties"); - try { - // mock properties and return a temporary file for the kerberos configuration - NiFiProperties mockedProperties = mock(NiFiProperties.class); - when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile); - SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties; - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); - // should be valid since no kerberos options specified - runner.assertValid(); - // no longer valid since only the principal is provided - runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL, "principal"); - runner.assertNotValid(); - // invalid since the keytab does not exist - runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, "BAD_KEYTAB_PATH"); - runner.assertNotValid(); - // valid since keytab is now a valid file location - runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, temporaryFile.getAbsolutePath()); - runner.assertValid(); - // invalid since the kerberos configuration was changed to a non-existent file - when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH")); - runner.assertNotValid(); - } finally { - temporaryFile.delete(); - } + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); + // should be valid since no kerberos options specified + runner.assertValid(); + // no longer valid since only the principal is provided + runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.setProperty(kerberosProperties.getKerberosPrincipal(), "principal"); + runner.assertNotValid(); + // invalid since the keytab does not exist + runner.setProperty(kerberosProperties.getKerberosKeytab(), "BAD_KEYTAB_PATH"); + runner.assertNotValid(); + // valid since keytab is now a valid file location + runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath()); + runner.assertValid(); + } + + @Test + public void testKerberosOptionsWithBadKerberosConfigFile() throws Exception { + // invalid since the kerberos configuration was changed to a non-existent file + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH")); + kerberosProperties = KerberosProperties.create(mockedProperties); + + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); + runner.assertValid(); + + runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.setProperty(kerberosProperties.getKerberosPrincipal(), "principal"); + runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath()); + runner.assertNotValid(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index e8714dd..64fe16f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -16,28 +16,43 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; - +import org.apache.hadoop.fs.Path; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.apache.hadoop.fs.Path; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class GetHDFSTest { + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + + @Before + public void setup() { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + } + @Test public void getPathDifferenceTest() { Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/root"), new Path("/file"))); @@ -69,7 +84,8 @@ public class GetHDFSTest { @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); Collection<ValidationResult> results; ProcessContext pc; @@ -110,7 +126,8 @@ public class GetHDFSTest { @Test public void testGetFilesWithFilter() { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -124,7 +141,8 @@ public class GetHDFSTest { @Test public void testAutomaticDecompression() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -142,7 +160,8 @@ public class GetHDFSTest { @Test public void testInferCompressionCodecDisabled() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -160,7 +179,8 @@ public class GetHDFSTest { @Test public void testFileExtensionNotACompressionCodec() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -175,4 +195,19 @@ public class GetHDFSTest { InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); } + + private static class TestableGetHDFS extends GetHDFS { + + private final KerberosProperties testKerberosProperties; + + public TestableGetHDFS(KerberosProperties testKerberosProperties) { + this.testKerberosProperties = testKerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index c524a44..76970ed 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -16,41 +16,49 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class PutHDFSTest { + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + @BeforeClass - public static void setUp() throws Exception{ + public static void setUpClass() throws Exception{ /* * Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality * provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules @@ -61,9 +69,17 @@ public class PutHDFSTest { */ } + @Before + public void setup() { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + } + @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); Collection<ValidationResult> results; ProcessContext pc; @@ -100,7 +116,8 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.REPLICATION_FACTOR, "0"); @@ -114,7 +131,8 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.UMASK, "-1"); @@ -128,7 +146,8 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because octal umask [-1] cannot be negative")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.UMASK, "18"); @@ -156,7 +175,8 @@ public class PutHDFSTest { } results = new HashSet<>(); - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); runner.enqueue(new byte[0]); @@ -175,7 +195,8 @@ public class PutHDFSTest { // Refer to comment in the BeforeClass method for an explanation assumeTrue(isNotWindows()); - TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setValidateExpressionUsage(false); @@ -208,11 +229,17 @@ public class PutHDFSTest { FileSystem fs = FileSystem.get(config); Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + final KerberosProperties testKerberosProperties = kerberosProperties; TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { @Override protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } }); runner.setProperty(PutHDFS.DIRECTORY, dirName); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); @@ -236,4 +263,19 @@ public class PutHDFSTest { private boolean isNotWindows() { return !System.getProperty("os.name").startsWith("Windows"); } + + private static class TestablePutHDFS extends PutHDFS { + + private KerberosProperties testKerberosProperties; + + public TestablePutHDFS(KerberosProperties testKerberosProperties) { + this.testKerberosProperties = testKerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java index 905460f..4d382eb 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java @@ -16,15 +16,26 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; public class SimpleHadoopProcessor extends AbstractHadoopProcessor { + private KerberosProperties testKerberosProperties; + + public SimpleHadoopProcessor(KerberosProperties kerberosProperties) { + this.testKerberosProperties = kerberosProperties; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 5357dff..e568dfb 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -16,25 +16,14 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -42,6 +31,19 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestCreateHadoopSequenceFile { private TestRunner controller; @@ -52,6 +54,9 @@ public class TestCreateHadoopSequenceFile { new File(testdata, "randombytes-2"), new File(testdata, "randombytes-3") }; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + @BeforeClass public static void setUpClass() { LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class); @@ -61,7 +66,12 @@ public class TestCreateHadoopSequenceFile { @Before public void setUp() { - controller = TestRunners.newTestRunner(CreateHadoopSequenceFile.class); + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + + CreateHadoopSequenceFile proc = new TestableCreateHadoopSequenceFile(kerberosProperties); + controller = TestRunners.newTestRunner(proc); } @After @@ -183,4 +193,17 @@ public class TestCreateHadoopSequenceFile { // fos.close(); } + private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile { + + private KerberosProperties testKerbersProperties; + + public TestableCreateHadoopSequenceFile(KerberosProperties testKerbersProperties) { + this.testKerbersProperties = testKerbersProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerbersProperties; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 7a77f06..6fcea95 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -44,23 +29,48 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestListHDFS { private TestRunner runner; private ListHDFSWithMockedFileSystem proc; private MockCacheClient service; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; @Before public void setup() throws InitializationException { - proc = new ListHDFSWithMockedFileSystem(); + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + + proc = new ListHDFSWithMockedFileSystem(kerberosProperties); runner = TestRunners.newTestRunner(proc); service = new MockCacheClient(); @@ -250,6 +260,16 @@ public class TestListHDFS { private class ListHDFSWithMockedFileSystem extends ListHDFS { private final MockFileSystem fileSystem = new MockFileSystem(); + private final KerberosProperties testKerberosProps; + + public ListHDFSWithMockedFileSystem(KerberosProperties kerberosProperties) { + this.testKerberosProps = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProps; + } @Override protected FileSystem getFileSystem() { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml index e06a193..fd849e3 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml @@ -1,4 +1,5 @@ <?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -13,7 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. --> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml new file mode 100644 index 0000000..2aca105 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>fs.default.name</name> + <value>hdfs://hbase</value> + </property> + <property> + <name>hadoop.security.authentication</name> + <value>kerberos</value> + </property> + <property> + <name>hadoop.security.authorization</name> + <value>true</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml index 5e3b55c..7f01a9f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml @@ -1,4 +1,5 @@ <?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -13,7 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. --> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index fa4d80a..3cd81a3 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -453,10 +453,12 @@ public class GetHBase extends AbstractProcessor { localState.delete(); } - try { - client.remove(getKey(), new StringSerDe()); - } catch (IOException e) { - getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e}); + if (client != null) { + try { + client.remove(getKey(), new StringSerDe()); + } catch (IOException e) { + getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e}); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index d83e9d6..2f5b6a5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -42,18 +42,6 @@ public interface HBaseClientService extends ControllerService { .addValidator(new ConfigFilesValidator()) .build(); - PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() - .name("kerberos-principal").displayName("Kerberos Principal") - .description("Principal of user writing to hbase").required(false) - .addValidator(StandardValidators.KERB_PRINC_VALIDATOR) - .build(); - - PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder() - .name("kerberos-keytab").displayName("Kerberos Keytab") - .description("Path to keytab file").required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() .name("ZooKeeper Quorum") .description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.") http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index 5c2314f..50b1005 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -41,7 +41,10 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> </dependency> - + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-utils</artifactId> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index e31a60f..3465135 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -44,6 +44,9 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.KerberosTicketRenewer; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; @@ -51,9 +54,11 @@ import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,9 +66,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @Tags({ "hbase", "client"}) @CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " + "a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " + @@ -73,23 +75,29 @@ import org.slf4j.LoggerFactory; @DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.", description="These properties will be set on the HBase configuration after loading any provided configuration files.") public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService { - private static final Logger LOG = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class); + static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum"; static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort"; static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent"; static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number"; + static final long TICKET_RENEWAL_PERIOD = 60000; + private volatile Connection connection; - private List<PropertyDescriptor> properties; + private volatile UserGroupInformation ugi; + private volatile KerberosTicketRenewer renewer; - protected boolean isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + private List<PropertyDescriptor> properties; + private KerberosProperties kerberosProperties; @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { + this.kerberosProperties = getKerberosProperties(); + List<PropertyDescriptor> props = new ArrayList<>(); props.add(HADOOP_CONF_FILES); - props.add(KERBEROS_PRINCIPAL); - props.add(KERBEROS_KEYTAB); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); props.add(ZOOKEEPER_QUORUM); props.add(ZOOKEEPER_CLIENT_PORT); props.add(ZOOKEEPER_ZNODE_PARENT); @@ -97,6 +105,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme this.properties = Collections.unmodifiableList(props); } + protected KerberosProperties getKerberosProperties() { + return KerberosProperties.create(NiFiProperties.getInstance()); + } + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; @@ -119,8 +131,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet(); boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet(); boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet(); - boolean kerbprincProvided = validationContext.getProperty(KERBEROS_PRINCIPAL).isSet(); - boolean kerbkeytabProvided = validationContext.getProperty(KERBEROS_KEYTAB).isSet(); final List<ValidationResult> problems = new ArrayList<>(); @@ -133,19 +143,21 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme .build()); } - if (isSecurityEnabled && (!kerbprincProvided || !kerbkeytabProvided)) { - problems.add(new ValidationResult.Builder().valid(false) - .subject(this.getClass().getSimpleName()).explanation("Kerberos" + - " principal and keytab must be provided when using a secure " + - "hbase") - .build()); + if (confFileProvided) { + final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).getValue(); + final Configuration hbaseConfig = getConfigurationFromFiles(configFiles); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + problems.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), hbaseConfig, principal, keytab, getLogger())); } return problems; } @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException { + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { this.connection = createConnection(context); // connection check @@ -154,18 +166,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme if (admin != null) { admin.listTableNames(); } - } - } - - protected Connection createConnection(final ConfigurationContext context) throws IOException { - final Configuration hbaseConfig = HBaseConfiguration.create(); - // if conf files are provided, start with those - if (context.getProperty(HADOOP_CONF_FILES).isSet()) { - for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) { - hbaseConfig.addResource(new Path(configFile.trim())); + // if we got here then we have a successful connection, so if we have a ugi then start a renewer + if (ugi != null) { + final String id = getClass().getSimpleName(); + renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger()); } } + } + + protected Connection createConnection(final ConfigurationContext context) throws IOException, InterruptedException { + final String configFiles = context.getProperty(HADOOP_CONF_FILES).getValue(); + final Configuration hbaseConfig = getConfigurationFromFiles(configFiles); // override with any properties that are provided if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) { @@ -188,23 +200,45 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme hbaseConfig.set(descriptor.getName(), entry.getValue()); } } - UserGroupInformation.setConfiguration(hbaseConfig); - isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); - if (UserGroupInformation.isSecurityEnabled()) { - try{ - UserGroupInformation.loginUserFromKeytab(context.getProperty(KERBEROS_PRINCIPAL).getValue(), - context.getProperty(KERBEROS_KEYTAB).getValue()); - LOG.info("HBase Security Enabled, Logging in as User {}"); - } catch (Exception e) { - } + + if (SecurityUtil.isSecurityEnabled(hbaseConfig)) { + final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[] {principal, keyTab}); + ugi = SecurityUtil.loginKerberos(hbaseConfig, principal, keyTab); + getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {principal, keyTab}); + + return ugi.doAs(new PrivilegedExceptionAction<Connection>() { + @Override + public Connection run() throws Exception { + return ConnectionFactory.createConnection(hbaseConfig); + } + }); + } else { - LOG.info("Simple Authentication"); - } - return ConnectionFactory.createConnection(hbaseConfig); + getLogger().info("Simple Authentication"); + return ConnectionFactory.createConnection(hbaseConfig); + } + + } + + protected Configuration getConfigurationFromFiles(final String configFiles) { + final Configuration hbaseConfig = HBaseConfiguration.create(); + if (StringUtils.isNotBlank(configFiles)) { + for (final String configFile : configFiles.split(",")) { + hbaseConfig.addResource(new Path(configFile.trim())); + } + } + return hbaseConfig; } @OnDisabled public void shutdown() { + if (renewer != null) { + renewer.stop(); + } + if (connection != null) { try { connection.close(); @@ -216,7 +250,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme @Override public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException { - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); try (final Table table = connection.getTable(TableName.valueOf(tableName))) { // Create one Put per row.... final Map<String, Put> rowPuts = new HashMap<>(); @@ -241,7 +274,6 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme @Override public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException { - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); try (final Table table = connection.getTable(TableName.valueOf(tableName))) { Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8)); for (final PutColumn column : columns) { @@ -263,7 +295,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme ParseFilter parseFilter = new ParseFilter(); filter = parseFilter.parseFilterString(filterExpression); } - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); + try (final Table table = connection.getTable(TableName.valueOf(tableName)); final ResultScanner scanner = getResults(table, columns, filter, minTime)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 58d9194..0854d28 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -25,18 +25,22 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,8 +61,27 @@ import static org.mockito.Mockito.when; public class TestHBase_1_1_2_ClientService { + private KerberosProperties kerberosPropsWithFile; + private KerberosProperties kerberosPropsWithoutFile; + + @Before + public void setup() { + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + NiFiProperties niFiPropertiesWithKerberos = Mockito.mock(NiFiProperties.class); + when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(new File("src/test/resources/krb5.conf")); + kerberosPropsWithFile = KerberosProperties.create(niFiPropertiesWithKerberos); + + NiFiProperties niFiPropertiesWithoutKerberos = Mockito.mock(NiFiProperties.class); + when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(null); + kerberosPropsWithoutFile = KerberosProperties.create(niFiPropertiesWithoutKerberos); + } + @Test - public void testCustomValidate() throws InitializationException { + public void testCustomValidate() throws InitializationException, IOException { final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); final String tableName = "nifi"; @@ -66,7 +89,7 @@ public class TestHBase_1_1_2_ClientService { when(table.getName()).thenReturn(TableName.valueOf(tableName)); // no conf file or zk properties so should be invalid - MockHBaseClientService service = new MockHBaseClientService(table); + MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.enableControllerService(service); @@ -74,16 +97,16 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // conf file with no zk properties should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); runner.assertValid(service); runner.removeControllerService(service); // only quorum and no conf file should be invalid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.enableControllerService(service); @@ -92,7 +115,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum and port, no znode, no conf file, should be invalid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -102,7 +125,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum, port, and znode, no conf file, should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -113,44 +136,60 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum and port with conf file should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); runner.enableControllerService(service); runner.assertValid(service); + runner.removeControllerService(service); - // Kerberos - principal with non-set keytab + // Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security + service = new MockHBaseClientService(table, kerberosPropsWithFile); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); + runner.enableControllerService(service); + runner.assertValid(service); + + // Kerberos - principal with non-set keytab and both config files runner.disableControllerService(service); - service.setIsSecurityEnabled(true); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site-security.xml"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, + "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add valid options runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); runner.enableControllerService(service); runner.assertValid(service); // Kerberos - add invalid non-existent keytab file runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/missing.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab"); runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add invalid principal runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "invalid"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), ""); runner.enableControllerService(service); runner.assertNotValid(service); - runner.removeControllerService(service); + // Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid + service = new MockHBaseClientService(table, kerberosPropsWithoutFile); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, + "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); + runner.setProperty(service, kerberosPropsWithoutFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithoutFile.getKerberosPrincipal(), "test@REALM"); + runner.enableControllerService(service); + runner.assertNotValid(service); } @Test @@ -370,9 +409,9 @@ public class TestHBase_1_1_2_ClientService { } private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException { - final MockHBaseClientService service = new MockHBaseClientService(table); + final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClient", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient"); return service; @@ -409,13 +448,20 @@ public class TestHBase_1_1_2_ClientService { private Table table; private List<Result> results = new ArrayList<>(); + private KerberosProperties kerberosProperties; - public MockHBaseClientService(final Table table) { + public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) { this.table = table; + this.kerberosProperties = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return kerberosProperties; } - public void setIsSecurityEnabled(boolean value) { - this.isSecurityEnabled = value; + protected void setKerberosProperties(KerberosProperties properties) { + this.kerberosProperties = properties; } public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml index 0875ea8..2aca105 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml @@ -15,16 +15,16 @@ limitations under the License. --> <configuration> - <property> - <name>fs.default.name</name> - <value>hdfs://hbase</value> - </property> - <property> - <name>hbase.security.authentication</name> - <value>kerberos</value> - </property> - <property> - <name>hbase.security.authorization</name> - <value>true</value> - </property> + <property> + <name>fs.default.name</name> + <value>hdfs://hbase</value> + </property> + <property> + <name>hadoop.security.authentication</name> + <value>kerberos</value> + </property> + <property> + <name>hadoop.security.authorization</name> + <value>true</value> + </property> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml index d022099..c044ee3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml @@ -15,8 +15,8 @@ limitations under the License. --> <configuration> - <property> - <name>fs.default.name</name> - <value>hdfs://hbase</value> - </property> + <property> + <name>fs.default.name</name> + <value>hdfs://hbase</value> + </property> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml new file mode 100644 index 0000000..0875ea8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>fs.default.name</name> + <value>hdfs://hbase</value> + </property> + <property> + <name>hbase.security.authentication</name> + <value>kerberos</value> + </property> + <property> + <name>hbase.security.authorization</name> + <value>true</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml new file mode 100644 index 0000000..d022099 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml @@ -0,0 +1,22 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>fs.default.name</name> + <value>hdfs://hbase</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml index 91cea1a..3045a5c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml @@ -40,6 +40,7 @@ <configuration> <excludes combine.children="append"> <exclude>src/test/resources/fake.keytab</exclude> + <exclude>src/test/resources/krb5.conf</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ec2db74..63018bb 100644 --- a/pom.xml +++ b/pom.xml @@ -1128,6 +1128,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-utils</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>0.6.0-SNAPSHOT</version> <scope>test</scope>
