NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors
This closes #281 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8773ec3d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8773ec3d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8773ec3d Branch: refs/heads/master Commit: 8773ec3d3e829bc3f7e78a5b08a04b1681b909e0 Parents: 94cf1be Author: Bryan Bende <[email protected]> Authored: Thu Mar 17 17:05:30 2016 -0400 Committer: Bryan Bende <[email protected]> Committed: Thu Mar 17 17:12:57 2016 -0400 ---------------------------------------------------------------------- nifi-commons/nifi-hadoop-utils/pom.xml | 60 ++++++++ .../apache/nifi/hadoop/KerberosProperties.java | 146 +++++++++++++++++++ .../nifi/hadoop/KerberosTicketRenewer.java | 81 ++++++++++ .../org/apache/nifi/hadoop/SecurityUtil.java | 114 +++++++++++++++ .../nifi/hadoop/TestKerberosProperties.java | 96 ++++++++++++ .../src/test/resources/krb5.conf | 12 ++ .../nifi/processor/util/StandardValidators.java | 4 - .../processor/util/TestStandardValidators.java | 20 --- nifi-commons/pom.xml | 1 + .../nifi-hdfs-processors/pom.xml | 4 + .../hadoop/AbstractHadoopProcessor.java | 144 +++++++++--------- .../hadoop/CreateHadoopSequenceFile.java | 23 ++- .../nifi/processors/hadoop/FetchHDFS.java | 28 ++-- ...lowFileStreamUnpackerSequenceFileWriter.java | 19 ++- .../apache/nifi/processors/hadoop/GetHDFS.java | 63 ++++---- .../processors/hadoop/GetHDFSSequenceFile.java | 24 ++- .../nifi/processors/hadoop/KeyValueReader.java | 16 +- .../apache/nifi/processors/hadoop/ListHDFS.java | 40 +++-- .../apache/nifi/processors/hadoop/PutHDFS.java | 43 +++--- .../hadoop/SequenceFileWriterImpl.java | 29 ++-- .../hadoop/TarUnpackerSequenceFileWriter.java | 13 +- .../nifi/processors/hadoop/ValueReader.java | 14 +- .../hadoop/ZipUnpackerSequenceFileWriter.java | 15 +- .../processors/hadoop/util/HDFSListing.java | 9 +- .../hadoop/util/InputStreamWritable.java | 4 +- .../nifi/processors/hadoop/util/LongSerDe.java | 10 +- .../hadoop/util/OutputStreamWritable.java | 11 +- .../hadoop/util/SequenceFileReader.java | 4 +- .../hadoop/util/SequenceFileWriter.java | 5 +- .../processors/hadoop/util/StringSerDe.java | 8 +- .../processors/hadoop/AbstractHadoopTest.java | 91 ++++++++---- .../nifi/processors/hadoop/GetHDFSTest.java | 65 +++++++-- .../nifi/processors/hadoop/PutHDFSTest.java | 84 ++++++++--- .../hadoop/SimpleHadoopProcessor.java | 13 +- .../hadoop/TestCreateHadoopSequenceFile.java | 55 +++++-- .../nifi/processors/hadoop/TestListHDFS.java | 52 +++++-- .../src/test/resources/core-site-broken.xml | 2 +- .../src/test/resources/core-site-security.xml | 30 ++++ .../src/test/resources/core-site.xml | 2 +- .../java/org/apache/nifi/hbase/GetHBase.java | 10 +- .../apache/nifi/hbase/HBaseClientService.java | 12 -- .../nifi-hbase_1_1_2-client-service/pom.xml | 5 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 114 +++++++++------ .../hbase/TestHBase_1_1_2_ClientService.java | 94 +++++++++--- .../src/test/resources/core-site-security.xml | 24 +-- .../src/test/resources/core-site.xml | 8 +- .../src/test/resources/hbase-site-security.xml | 30 ++++ .../src/test/resources/hbase-site.xml | 22 +++ .../src/test/resources/krb5.conf | 0 .../pom.xml | 1 + pom.xml | 5 + 51 files changed, 1274 insertions(+), 505 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml b/nifi-commons/nifi-hadoop-utils/pom.xml new file mode 100644 index 0000000..d0177ce --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/pom.xml @@ -0,0 +1,60 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-hadoop-utils</artifactId> + <version>0.6.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/krb5.conf</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java new file mode 100644 index 0000000..5e8fb7d --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * All processors and controller services that need properties for Kerberos Principal and Keytab + * should obtain them through this class by calling: + * + * KerberosProperties props = KerberosProperties.create(NiFiProperties.getInstance()) + * + * The properties can be accessed from the resulting KerberosProperties instance. + */ +public class KerberosProperties { + + private final File kerberosConfigFile; + private final Validator kerberosConfigValidator; + private final PropertyDescriptor kerberosPrincipal; + private final PropertyDescriptor kerberosKeytab; + + private KerberosProperties(final File kerberosConfigFile) { + this.kerberosConfigFile = kerberosConfigFile; + + if (this.kerberosConfigFile != null) { + System.setProperty("java.security.krb5.conf", kerberosConfigFile.getAbsolutePath()); + } + + this.kerberosConfigValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Check that the Kerberos configuration is set + if (kerberosConfigFile == null) { + return new ValidationResult.Builder() + .subject(subject).input(input).valid(false) + .explanation("you are missing the nifi.kerberos.krb5.file property which " + + "must be set in order to use Kerberos") + .build(); + } + + // Check that the Kerberos configuration is readable + if (!kerberosConfigFile.canRead()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + + "and nifi has adequate permissions", kerberosConfigFile.getAbsoluteFile())) + .build(); + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + + this.kerberosPrincipal = new PropertyDescriptor.Builder() + .name("Kerberos Principal") + .required(false) + .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(kerberosConfigValidator) + .build(); + + this.kerberosKeytab = new PropertyDescriptor.Builder() + .name("Kerberos Keytab").required(false) + .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(kerberosConfigValidator) + .build(); + } + + public static KerberosProperties create(final NiFiProperties niFiProperties) { + if (niFiProperties == null) { + throw new IllegalArgumentException("NiFiProperties can not be null"); + } + return new KerberosProperties(niFiProperties.getKerberosConfigurationFile()); + } + + public File getKerberosConfigFile() { + return kerberosConfigFile; + } + + public Validator getKerberosConfigValidator() { + return kerberosConfigValidator; + } + + public PropertyDescriptor getKerberosPrincipal() { + return kerberosPrincipal; + } + + public PropertyDescriptor getKerberosKeytab() { + return kerberosKeytab; + } + + public static List<ValidationResult> validatePrincipalAndKeytab(final String subject, final Configuration config, final String principal, final String keytab, final ComponentLog logger) { + final List<ValidationResult> results = new ArrayList<>(); + + // if security is enabled then the keytab and principal are required + final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config); + + if (isSecurityEnabled && StringUtils.isBlank(principal)) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Principal must be provided when using a secure configuration") + .build()); + } + + if (isSecurityEnabled && StringUtils.isBlank(keytab)) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Keytab must be provided when using a secure configuration") + .build()); + } + + if (!isSecurityEnabled && (!StringUtils.isBlank(principal) || !StringUtils.isBlank(keytab))) { + logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored"); + } + + return results; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java new file mode 100644 index 0000000..d451535 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; + +/** + * Periodically attempts to renew the Kerberos user's ticket for the given UGI. + * + * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which + * will re-login the user if the ticket expired or is close to expiry. Between + * relogin attempts this thread will sleep for the provided amount of time. + * + */ +public class KerberosTicketRenewer implements Runnable { + + private final UserGroupInformation ugi; + private final long renewalPeriod; + private final ComponentLog logger; + + private volatile boolean stopped = false; + + /** + * @param ugi + * the user to renew the ticket for + * @param renewalPeriod + * the amount of time in milliseconds to wait between renewal attempts + * @param logger + * the logger from the component that started the renewer + */ + public KerberosTicketRenewer(final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + this.ugi = ugi; + this.renewalPeriod = renewalPeriod; + this.logger = logger; + } + + @Override + public void run() { + stopped = false; + while (!stopped) { + try { + logger.debug("Invoking renewal attempt for Kerberos ticket"); + // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime. + ugi.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + logger.error("Failed to renew Kerberos ticket", e); + } + + // Wait for a bit before checking again. + try { + Thread.sleep(renewalPeriod); + } catch (InterruptedException e) { + logger.error("Renewal thread interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + } + + public void stop() { + stopped = true; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java new file mode 100644 index 0000000..74197ef --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.commons.lang3.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; + +/** + * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from + * interfering with each other. + */ +public class SecurityUtil { + + /** + * Initializes UserGroupInformation with the given Configuration and performs the login for the given principal + * and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * @param principal the principal to authenticate as + * @param keyTab the keytab to authenticate with + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab) + throws IOException { + Validate.notNull(config); + Validate.notNull(principal); + Validate.notNull(keyTab); + + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim()); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.getLoginUser(). + * All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { + Validate.notNull(config); + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.getLoginUser(); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.isSecurityEnabled(). + * + * All checks for isSecurityEnabled() should happen through this method. + * + * @param config the given configuration + * + * @return true if kerberos is enabled on the given configuration, false otherwise + * + */ + public static synchronized boolean isSecurityEnabled(final Configuration config) { + Validate.notNull(config); + return "kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication")); + } + + /** + * Start a thread that periodically attempts to renew the current Kerberos user's ticket. + * + * Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread. + * + * @param id + * The unique identifier to use for the thread, can be the class name that started the thread + * (i.e. PutHDFS, etc) + * @param ugi + * The current Kerberos user. + * @param renewalPeriod + * The amount of time between attempting renewals. + * @param logger + * The logger to use with in the renewer + * + * @return the KerberosTicketRenewer Runnable + */ + public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger); + + final Thread t = new Thread(renewer); + t.setName("Kerberos Ticket Renewal [" + id + "]"); + t.start(); + + return renewer; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java new file mode 100644 index 0000000..131fe65 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.List; + +import static org.mockito.Mockito.when; + +public class TestKerberosProperties { + + @Test + public void testWithKerberosConfigFile() { + final File file = new File("src/test/resources/krb5.conf"); + final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); + when(niFiProperties.getKerberosConfigurationFile()).thenReturn(file); + + final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNotNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertTrue(result.isValid()); + } + + @Test + public void testWithoutKerberosConfigFile() { + final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); + when(niFiProperties.getKerberosConfigurationFile()).thenReturn(null); + + final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertFalse(result.isValid()); + } + + @Test + public void testValidatePrincipalAndKeytab() { + final ComponentLog log = Mockito.mock(ComponentLog.class); + final Configuration config = new Configuration(); + + // no security enabled in config so doesn't matter what principal and keytab are + List<ValidationResult> results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", "keytab", log); + Assert.assertEquals(0, results.size()); + + // change the config to have kerberos turned on + config.set("hadoop.security.authentication", "kerberos"); + config.set("hadoop.security.authorization", "true"); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(2, results.size()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf new file mode 100644 index 0000000..814d5b2 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf @@ -0,0 +1,12 @@ +[libdefaults] + default_realm = EXAMPLE.COM + +[realms] + EXAMPLE.COM = { + kdc = kdc1.example.com + kdc = kdc2.example.com + admin_server = kdc1.example.com + } + +[domain_realm] + .example.com = EXAMPLE.COM \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index c29b1d2..8255781 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -333,10 +333,6 @@ public class StandardValidators { public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true); - private static final String PRINCIPAL_CHAR_CLASS = "[A-Za-z0-9\\\\\\/\\.@]"; - - public static final Validator KERB_PRINC_VALIDATOR = createRegexMatchingValidator(Pattern.compile(PRINCIPAL_CHAR_CLASS + "+" + - "@" + PRINCIPAL_CHAR_CLASS + "+")); // // // FACTORY METHODS FOR VALIDATORS http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java index 8f2590a..13d2f4f 100644 --- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -84,25 +84,5 @@ public class TestStandardValidators { vr = val.validate("DataSizeBounds", "water", validationContext); assertFalse(vr.isValid()); - - } - - @Test - public void testKerbPrincipalValidator() { - Validator val = StandardValidators.KERB_PRINC_VALIDATOR; - ValidationResult vr; - - final ValidationContext validationContext = Mockito.mock(ValidationContext.class); - vr = val.validate("Kerberos Principal","[email protected]", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("Kerberos Principal","jon@CDH", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("kerberos-principal","service/nifi@PROD", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("keberos-principal", "joewitt", validationContext); - assertFalse(vr.isValid()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml index 9d1448d..9a4542d 100644 --- a/nifi-commons/pom.xml +++ b/nifi-commons/pom.xml @@ -37,5 +37,6 @@ <module>nifi-write-ahead-log</module> <module>nifi-site-to-site-client</module> <module>nifi-hl7-query-language</module> + <module>nifi-hadoop-utils</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 39b1adb..00b3b04 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -36,6 +36,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-flowfile-packager</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index b536996..cd9683b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -16,20 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,12 +34,29 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; + +import javax.net.SocketFactory; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * This is a base class that is helpful when building processors interacting with HDFS. @@ -86,82 +89,28 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } - - private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - // Check that both the principal & keytab are set before checking the kerberos config - if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both keytab and principal must be set in order to use Kerberos authentication").build(); - } - - // Check that the Kerberos configuration is set - if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation("you are missing the nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in order to use Kerberos").build(); - } - - // Check that the Kerberos configuration is readable - if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + "and nifi has adequate permissions", - NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile())) - .build(); - } - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - }; - // properties public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources") .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") .required(false).addValidator(createMultipleFilesExistValidator()).build(); - public static NiFiProperties NIFI_PROPERTIES = null; - public static final String DIRECTORY_PROP_NAME = "Directory"; public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true) .allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build(); - public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false) - .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) - .addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - - public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("Kerberos Keytab").required(false) - .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - protected static final List<PropertyDescriptor> properties; - private static final Object RESOURCES_LOCK = new Object(); private long kerberosReloginThreshold; private long lastKerberosReloginTime; - - static { - List<PropertyDescriptor> props = new ArrayList<>(); - props.add(HADOOP_CONFIGURATION_RESOURCES); - props.add(KERBEROS_PRINCIPAL); - props.add(KERBEROS_KEYTAB); - props.add(KERBEROS_RELOGIN_PERIOD); - properties = Collections.unmodifiableList(props); - try { - NIFI_PROPERTIES = NiFiProperties.getInstance(); - } catch (Exception e) { - // This will happen during tests - NIFI_PROPERTIES = null; - } - if (NIFI_PROPERTIES != null && NIFI_PROPERTIES.getKerberosConfigurationFile() != null) { - System.setProperty("java.security.krb5.conf", NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath()); - } - } + protected KerberosProperties kerberosProperties; + protected List<PropertyDescriptor> properties; // variables shared by all threads of this processor // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) @@ -170,6 +119,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { @Override protected void init(ProcessorInitializationContext context) { hdfsResources.set(new HdfsResources(null, null, null)); + kerberosProperties = getKerberosProperties(); + + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + props.add(KERBEROS_RELOGIN_PERIOD); + properties = Collections.unmodifiableList(props); + } + + protected KerberosProperties getKerberosProperties() { + return KerberosProperties.create(NiFiProperties.getInstance()); } @Override @@ -177,9 +138,36 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return properties; } + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + final List<ValidationResult> results = new ArrayList<>(); + + if (!StringUtils.isBlank(configResources)) { + Configuration conf = null; + try { + conf = getConfigurationFromResources(configResources); + + results.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), conf, principal, keytab, getLogger())); + } catch (IOException e) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(this.getClass().getSimpleName()) + .explanation("Could not load Hadoop Configuration resources") + .build()); + } + } + + return results; + } + /* - * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) - */ + * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) + */ @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { @@ -261,18 +249,16 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { FileSystem fs; UserGroupInformation ugi; synchronized (RESOURCES_LOCK) { - if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) { - String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue(); - String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue(); - UserGroupInformation.setConfiguration(config); - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); + if (SecurityUtil.isSecurityEnabled(config)) { + String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + ugi = SecurityUtil.loginKerberos(config, principal, keyTab); fs = getFileSystemAsUser(config, ugi); lastKerberosReloginTime = System.currentTimeMillis() / 1000; } else { config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); config.set("hadoop.security.authentication", "simple"); - UserGroupInformation.setConfiguration(config); - ugi = UserGroupInformation.getLoginUser(); + ugi = SecurityUtil.loginSimple(config); fs = getFileSystemAsUser(config, ugi); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 385ac73..4b8f87e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.hadoop.io.SequenceFile; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -38,6 +32,12 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + /** * <p> * This processor is used to create a Hadoop Sequence File, which essentially is a file of key/value pairs. The key will be a file name and the value will be the flow file content. The processor will @@ -93,13 +93,6 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { .allowableValues(CompressionType.values()) .build(); - private static final List<PropertyDescriptor> props; - - static { - List<PropertyDescriptor> someProps = new ArrayList<>(properties); - someProps.add(COMPRESSION_TYPE); - props = Collections.unmodifiableList(someProps); - } // Default Values. public static final String DEFAULT_COMPRESSION_TYPE = "NONE"; @@ -110,7 +103,9 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return props; + List<PropertyDescriptor> someProps = new ArrayList<>(properties); + someProps.add(COMPRESSION_TYPE); + return someProps; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 6b04910..fdb2dcf 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +36,15 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) @@ -80,13 +80,9 @@ public class FetchHDFS extends AbstractHadoopProcessor { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(FILENAME); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(FILENAME); + return props; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java index d3fb97f..0194e87 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.FlowFilePackagerV3; +import org.slf4j.LoggerFactory; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -23,16 +32,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; -import org.apache.nifi.util.FlowFilePackagerV3; - -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.slf4j.LoggerFactory; - public class FlowFileStreamUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 4c9deea..d18c13f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -63,6 +47,22 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) @@ -178,28 +178,12 @@ public class GetHDFS extends AbstractHadoopProcessor { .build(); private static final Set<Relationship> relationships; - protected static final List<PropertyDescriptor> localProperties; static { final Set<Relationship> rels = new HashSet<>(); rels.add(REL_SUCCESS); rels.add(REL_PASSTHROUGH); relationships = Collections.unmodifiableSet(rels); - - List<PropertyDescriptor> props = new ArrayList<>(properties); - props.add(DIRECTORY); - props.add(RECURSE_SUBDIRS); - props.add(KEEP_SOURCE_FILE); - props.add(FILE_FILTER_REGEX); - props.add(FILTER_MATCH_NAME_ONLY); - props.add(IGNORE_DOTTED_FILES); - props.add(MIN_AGE); - props.add(MAX_AGE); - props.add(POLLING_INTERVAL); - props.add(BATCH_SIZE); - props.add(BUFFER_SIZE); - props.add(COMPRESSION_CODEC); - localProperties = Collections.unmodifiableList(props); } protected ProcessorConfiguration processorConfig; @@ -219,7 +203,20 @@ public class GetHDFS extends AbstractHadoopProcessor { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return localProperties; + List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(DIRECTORY); + props.add(RECURSE_SUBDIRS); + props.add(KEEP_SOURCE_FILE); + props.add(FILE_FILTER_REGEX); + props.add(FILTER_MATCH_NAME_ONLY); + props.add(IGNORE_DOTTED_FILES); + props.add(MIN_AGE); + props.add(MAX_AGE); + props.add(POLLING_INTERVAL); + props.add(BATCH_SIZE); + props.add(BUFFER_SIZE); + props.add(COMPRESSION_CODEC); + return props; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index f032ee4..7de728a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +32,12 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested * SequenceFile. The created flow file's content depends on the value of the optional configuration property FlowFile Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR. With the @@ -64,17 +64,11 @@ public class GetHDFSSequenceFile extends GetHDFS { .required(true) .build(); - static final List<PropertyDescriptor> props; - - static { - List<PropertyDescriptor> someProps = new ArrayList<>(localProperties); - someProps.add(FLOWFILE_CONTENT); - props = Collections.unmodifiableList(someProps); - } - @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return props; + List<PropertyDescriptor> someProps = new ArrayList<>(super.getSupportedPropertyDescriptors()); + someProps.add(FLOWFILE_CONTENT); + return Collections.unmodifiableList(someProps); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java index 896e4d8..2aa77e1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -42,6 +34,14 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; + /** * This class reads a SequenceFile and generates FlowFiles, one per KeyValue pair in the SequenceFile. The FlowFile name is based on the the incoming file name with System nanotime appended; the * FlowFile content is the key/value pair serialized via Text. http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 2100f48..f5daef2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,13 +46,25 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.HDFSListing; -import org.apache.nifi.processors.hadoop.util.StringSerDe; import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys; +import org.apache.nifi.processors.hadoop.util.StringSerDe; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + @TriggerSerially @TriggerWhenEmpty @@ -143,15 +143,11 @@ public class ListHDFS extends AbstractHadoopProcessor { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(DISTRIBUTED_CACHE_SERVICE); - properties.add(DIRECTORY); - properties.add(RECURSE_SUBDIRS); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(DISTRIBUTED_CACHE_SERVICE); + props.add(DIRECTORY); + props.add(RECURSE_SUBDIRS); + return props; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 0c36928..7c97478 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,6 +47,17 @@ import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * This processor copies FlowFiles to HDFS. */ @@ -144,14 +144,21 @@ public class PutHDFS extends AbstractHadoopProcessor { .build(); private static final Set<Relationship> relationships; - private static final List<PropertyDescriptor> localProperties; static { final Set<Relationship> rels = new HashSet<>(); rels.add(REL_SUCCESS); rels.add(REL_FAILURE); relationships = Collections.unmodifiableSet(rels); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { List<PropertyDescriptor> props = new ArrayList<>(properties); props.add(DIRECTORY); props.add(CONFLICT_RESOLUTION); @@ -162,17 +169,7 @@ public class PutHDFS extends AbstractHadoopProcessor { props.add(REMOTE_OWNER); props.add(REMOTE_GROUP); props.add(COMPRESSION_CODEC); - localProperties = Collections.unmodifiableList(props); - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return localProperties; + return props; } @OnScheduled http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java index 4bb9ca9..a0d02f7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; -import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; -import org.apache.nifi.util.StopWatch; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -40,9 +25,23 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; + public class SequenceFileWriterImpl implements SequenceFileWriter { protected static Logger logger = LoggerFactory.getLogger(SequenceFileWriterImpl.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java index 82e1de2..fbc1875 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java @@ -16,19 +16,18 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; - import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; + public class TarUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java index a6f7005..0ef103d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -41,6 +34,13 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; + /** * This class reads a SequenceFile and generates FlowFiles, one per each KeyValue Pair in the SequenceFile. The FlowFile name is the key, which is typically a file name but may not be; the FlowFile * content is the value. http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java index c986e9a..8ce6a7e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; - -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.slf4j.LoggerFactory; - public class ZipUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java index 6786945..2d1c01a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.fs.Path; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -24,11 +28,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import javax.xml.bind.annotation.XmlTransient; -import javax.xml.bind.annotation.XmlType; - -import org.apache.hadoop.fs.Path; - /** * A simple POJO for maintaining state about the last HDFS Listing that was performed so that * we can avoid pulling the same file multiple times http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java index 4cb2e8d..a463dfd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java @@ -16,13 +16,13 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.io.Writable; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.io.Writable; - /** * Simple implementation of {@link Writable} that writes data from an InputStream. This class will throw an * <tt>UnsupportedOperationException</tt> if {@link #readFields(DataInput)} is called. http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java index 17cacd9..25e66f8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@ -16,17 +16,17 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; - public class LongSerDe implements Serializer<Long>, Deserializer<Long> { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java index e954721..234ca2a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java @@ -16,17 +16,16 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.DataOutputStream; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.DataOutputStream; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.Writable; - /** * This class will write to an output stream, rather than an in-memory buffer, the fields being read. * http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java index e197426..fabf18d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.processors.hadoop.util; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.io.IOException; + public interface SequenceFileReader<T> { public T readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java index 851afd8..6ad6461 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java @@ -16,11 +16,10 @@ */ package org.apache.nifi.processors.hadoop.util; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; public interface SequenceFileWriter { http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java index 2a52c4d..e13f6b8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@ -16,15 +16,15 @@ */ package org.apache.nifi.processors.hadoop.util; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; - import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + public class StringSerDe implements Serializer<String>, Deserializer<String> { @Override
