http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java new file mode 100644 index 0000000..d5ab8c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class DateTimeUtils { + public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() + .name("Date Format") + .description("Specifies the format to use when reading/writing Date fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.DATE.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() + .name("Time Format") + .description("Specifies the format to use when reading/writing Time fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.TIME.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() + .name("Timestamp Format") + .description("Specifies the format to use when reading/writing Timestamp fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java new file mode 100644 index 0000000..f25749b --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.text.SimpleDateFormat; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class SimpleDateFormatValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + new SimpleDateFormat(input); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Invalid Date format: " + e.getMessage()) + .build(); + } + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml new file mode 100644 index 0000000..fea0920 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-extension-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <packaging>pom</packaging> + <artifactId>nifi-record-utils</artifactId> + + <modules> + <module>nifi-avro-record-utils</module> + <module>nifi-standard-record-utils</module> + <module>nifi-hadoop-record-utils</module> + <module>nifi-mock-record-utils</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml new file mode 100644 index 0000000..4e1a0f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <packaging>pom</packaging> + <artifactId>nifi-extension-utils</artifactId> + <description> + This module contains reusable utilities related to extensions that can be shared across NARs. + </description> + + <modules> + <module>nifi-record-utils</module> + <module>nifi-hadoop-utils</module> + <module>nifi-processor-utils</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 7429ea8..493caf0 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -33,12 +33,6 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-hadoop-utils</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.nifi</groupId> @@ -50,16 +44,16 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-distributed-cache-client-service-api</artifactId> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java deleted file mode 100644 index dac2b30..0000000 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.Lz4Codec; -import org.apache.hadoop.io.compress.SnappyCodec; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.hadoop.KerberosProperties; -import org.apache.nifi.hadoop.SecurityUtil; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import javax.net.SocketFactory; -import java.io.File; -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * This is a base class that is helpful when building processors interacting with HDFS. - */ -@RequiresInstanceClassLoading(cloneAncestorResources = true) -public abstract class AbstractHadoopProcessor extends AbstractProcessor { - /** - * Compression Type Enum - */ - public enum CompressionType { - NONE, - DEFAULT, - BZIP, - GZIP, - LZ4, - SNAPPY, - AUTOMATIC; - - @Override - public String toString() { - switch (this) { - case NONE: return "NONE"; - case DEFAULT: return DefaultCodec.class.getName(); - case BZIP: return BZip2Codec.class.getName(); - case GZIP: return GzipCodec.class.getName(); - case LZ4: return Lz4Codec.class.getName(); - case SNAPPY: return SnappyCodec.class.getName(); - case AUTOMATIC: return "Automatically Detected"; - } - return null; - } - } - - // properties - public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() - .name("Hadoop Configuration Resources") - .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " - + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") - .required(false) - .addValidator(createMultipleFilesExistValidator()) - .build(); - - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name("Directory") - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression codec") - .required(true) - .allowableValues(CompressionType.values()) - .defaultValue(CompressionType.NONE.toString()) - .build(); - - public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder() - .name("Kerberos Relogin Period").required(false) - .description("Period of time which should pass before attempting a kerberos relogin") - .defaultValue("4 hours") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder() - .name("Additional Classpath Resources") - .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " + - "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamicallyModifiesClasspath(true) - .build(); - - private static final Object RESOURCES_LOCK = new Object(); - - private long kerberosReloginThreshold; - private long lastKerberosReloginTime; - protected KerberosProperties kerberosProperties; - protected List<PropertyDescriptor> properties; - private volatile File kerberosConfigFile = null; - - // variables shared by all threads of this processor - // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) - private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>(); - - // Holder of cached Configuration information so validation does not reload the same config over and over - private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(); - - @Override - protected void init(ProcessorInitializationContext context) { - hdfsResources.set(new HdfsResources(null, null, null)); - - kerberosConfigFile = context.getKerberosConfigurationFile(); - kerberosProperties = getKerberosProperties(kerberosConfigFile); - - List<PropertyDescriptor> props = new ArrayList<>(); - props.add(HADOOP_CONFIGURATION_RESOURCES); - props.add(kerberosProperties.getKerberosPrincipal()); - props.add(kerberosProperties.getKerberosKeytab()); - props.add(KERBEROS_RELOGIN_PERIOD); - props.add(ADDITIONAL_CLASSPATH_RESOURCES); - properties = Collections.unmodifiableList(props); - } - - protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { - return new KerberosProperties(kerberosConfigFile); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); - final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); - final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); - - final List<ValidationResult> results = new ArrayList<>(); - - if (!StringUtils.isBlank(configResources)) { - try { - ValidationResources resources = validationResourceHolder.get(); - - // if no resources in the holder, or if the holder has different resources loaded, - // then load the Configuration and set the new resources in the holder - if (resources == null || !configResources.equals(resources.getConfigResources())) { - getLogger().debug("Reloading validation resources"); - final Configuration config = new ExtendedConfiguration(getLogger()); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources)); - validationResourceHolder.set(resources); - } - - final Configuration conf = resources.getConfiguration(); - results.addAll(KerberosProperties.validatePrincipalAndKeytab( - this.getClass().getSimpleName(), conf, principal, keytab, getLogger())); - - } catch (IOException e) { - results.add(new ValidationResult.Builder() - .valid(false) - .subject(this.getClass().getSimpleName()) - .explanation("Could not load Hadoop Configuration resources") - .build()); - } - } - - return results; - } - - /* - * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) - */ - @OnScheduled - public final void abstractOnScheduled(ProcessContext context) throws IOException { - try { - // This value will be null when called from ListHDFS, because it overrides all of the default - // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos - if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) { - kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS); - } - HdfsResources resources = hdfsResources.get(); - if (resources.getConfiguration() == null) { - final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); - resources = resetHDFSResources(configResources, context); - hdfsResources.set(resources); - } - } catch (IOException ex) { - getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); - hdfsResources.set(new HdfsResources(null, null, null)); - throw ex; - } - } - - @OnStopped - public final void abstractOnStopped() { - hdfsResources.set(new HdfsResources(null, null, null)); - } - - private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException { - boolean foundResources = false; - if (null != configResources) { - String[] resources = configResources.split(","); - for (String resource : resources) { - config.addResource(new Path(resource.trim())); - foundResources = true; - } - } - - if (!foundResources) { - // check that at least 1 non-default resource is available on the classpath - String configStr = config.toString(); - for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { - if (!resource.contains("default") && config.getResource(resource.trim()) != null) { - foundResources = true; - break; - } - } - } - - if (!foundResources) { - throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath"); - } - return config; - } - - /* - * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. - */ - HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException { - Configuration config = new ExtendedConfiguration(getLogger()); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - - getConfigurationFromResources(config, configResources); - - // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout - checkHdfsUriForTimeout(config); - - // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete - // restart - String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); - config.set(disableCacheName, "true"); - - // If kerberos is enabled, create the file system as the kerberos principal - // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time - FileSystem fs; - UserGroupInformation ugi; - synchronized (RESOURCES_LOCK) { - if (SecurityUtil.isSecurityEnabled(config)) { - String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); - String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); - ugi = SecurityUtil.loginKerberos(config, principal, keyTab); - fs = getFileSystemAsUser(config, ugi); - lastKerberosReloginTime = System.currentTimeMillis() / 1000; - } else { - config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); - config.set("hadoop.security.authentication", "simple"); - ugi = SecurityUtil.loginSimple(config); - fs = getFileSystemAsUser(config, ugi); - } - } - getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi}); - - final Path workingDir = fs.getWorkingDirectory(); - getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", - new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); - - return new HdfsResources(config, fs, ugi); - } - - /** - * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received - * - * @param config - * the configuration to use - * @return the FileSystem that is created for the given Configuration - * @throws IOException - * if unable to create the FileSystem - */ - protected FileSystem getFileSystem(final Configuration config) throws IOException { - return FileSystem.get(config); - } - - protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { - try { - return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { - @Override - public FileSystem run() throws Exception { - return FileSystem.get(config); - } - }); - } catch (InterruptedException e) { - throw new IOException("Unable to create file system: " + e.getMessage()); - } - } - - /* - * Drastically reduce the timeout of a socket connection from the default in FileSystem.get() - */ - protected void checkHdfsUriForTimeout(Configuration config) throws IOException { - URI hdfsUri = FileSystem.getDefaultUri(config); - String address = hdfsUri.getAuthority(); - int port = hdfsUri.getPort(); - if (address == null || address.isEmpty() || port < 0) { - return; - } - InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); - SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); - Socket socket = null; - try { - socket = socketFactory.createSocket(); - NetUtils.connect(socket, namenode, 1000); // 1 second timeout - } finally { - IOUtils.closeQuietly(socket); - } - } - - /* - * Validates that one or more files exist, as specified in a single property. - */ - public static final Validator createMultipleFilesExistValidator() { - return new Validator() { - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - final String[] files = input.split(","); - for (String filename : files) { - try { - final File file = new File(filename.trim()); - final boolean valid = file.exists() && file.isFile(); - if (!valid) { - final String message = "File " + file + " does not exist or is not a file"; - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); - } - } catch (SecurityException e) { - final String message = "Unable to access " + filename + " due to " + e.getMessage(); - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); - } - } - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - - }; - } - - /** - * Returns the configured CompressionCodec, or null if none is configured. - * - * @param context - * the ProcessContext - * @param configuration - * the Hadoop Configuration - * @return CompressionCodec or null - */ - protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { - org.apache.hadoop.io.compress.CompressionCodec codec = null; - if (context.getProperty(COMPRESSION_CODEC).isSet()) { - String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString(); - CompressionCodecFactory ccf = new CompressionCodecFactory(configuration); - codec = ccf.getCodecByClassName(compressionClassname); - } - - return codec; - } - - /** - * Returns the relative path of the child that does not include the filename or the root path. - * - * @param root - * the path to relativize from - * @param child - * the path to relativize - * @return the relative path - */ - public static String getPathDifference(final Path root, final Path child) { - final int depthDiff = child.depth() - root.depth(); - if (depthDiff <= 1) { - return "".intern(); - } - String lastRoot = root.getName(); - Path childsParent = child.getParent(); - final StringBuilder builder = new StringBuilder(); - builder.append(childsParent.getName()); - for (int i = (depthDiff - 3); i >= 0; i--) { - childsParent = childsParent.getParent(); - String name = childsParent.getName(); - if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { - break; - } - builder.insert(0, Path.SEPARATOR).insert(0, name); - } - return builder.toString(); - } - - protected Configuration getConfiguration() { - return hdfsResources.get().getConfiguration(); - } - - protected FileSystem getFileSystem() { - // trigger Relogin if necessary - getUserGroupInformation(); - return hdfsResources.get().getFileSystem(); - } - - protected UserGroupInformation getUserGroupInformation() { - // if kerberos is enabled, check if the ticket should be renewed before returning - UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation(); - if (userGroupInformation != null && isTicketOld()) { - tryKerberosRelogin(userGroupInformation); - } - return userGroupInformation; - } - - protected void tryKerberosRelogin(UserGroupInformation ugi) { - try { - getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " + - "attempting to renew ticket for user {}", new Object[]{ - kerberosReloginThreshold, ugi.getUserName()}); - ugi.doAs((PrivilegedExceptionAction<Void>) () -> { - ugi.checkTGTAndReloginFromKeytab(); - return null; - }); - lastKerberosReloginTime = System.currentTimeMillis() / 1000; - getLogger().info("Kerberos relogin successful or ticket still valid"); - } catch (IOException e) { - // Most likely case of this happening is ticket is expired and error getting a new one, - // meaning dfs operations would fail - getLogger().error("Kerberos relogin failed", e); - throw new ProcessException("Unable to renew kerberos ticket", e); - } catch (InterruptedException e) { - getLogger().error("Interrupted while attempting Kerberos relogin", e); - throw new ProcessException("Unable to renew kerberos ticket", e); - } - } - - protected boolean isTicketOld() { - return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold; - } - - - static protected class HdfsResources { - private final Configuration configuration; - private final FileSystem fileSystem; - private final UserGroupInformation userGroupInformation; - - public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) { - this.configuration = configuration; - this.fileSystem = fileSystem; - this.userGroupInformation = userGroupInformation; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public UserGroupInformation getUserGroupInformation() { - return userGroupInformation; - } - } - - static protected class ValidationResources { - private final String configResources; - private final Configuration configuration; - - public ValidationResources(String configResources, Configuration configuration) { - this.configResources = configResources; - this.configuration = configuration; - } - - public String getConfigResources() { - return configResources; - } - - public Configuration getConfiguration() { - return configuration; - } - } - - /** - * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be - * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load - * something that was previously not found, but might now be available. - * - * Reference the original getClassByNameOrNull from Configuration. - */ - static class ExtendedConfiguration extends Configuration { - - private final ComponentLog logger; - private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>(); - - public ExtendedConfiguration(final ComponentLog logger) { - this.logger = logger; - } - - public Class<?> getClassByNameOrNull(String name) { - final ClassLoader classLoader = getClassLoader(); - - Map<String, WeakReference<Class<?>>> map; - synchronized (CACHE_CLASSES) { - map = CACHE_CLASSES.get(classLoader); - if (map == null) { - map = Collections.synchronizedMap(new WeakHashMap<>()); - CACHE_CLASSES.put(classLoader, map); - } - } - - Class<?> clazz = null; - WeakReference<Class<?>> ref = map.get(name); - if (ref != null) { - clazz = ref.get(); - } - - if (clazz == null) { - try { - clazz = Class.forName(name, true, classLoader); - } catch (ClassNotFoundException e) { - logger.error(e.getMessage(), e); - return null; - } - // two putters can race here, but they'll put the same class - map.put(name, new WeakReference<>(clazz)); - return clazz; - } else { - // cache hit - return clazz; - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 6232936..69b5b77 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -36,9 +36,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; @@ -96,8 +93,6 @@ public class PutHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final int BUFFER_SIZE_DEFAULT = 4096; - public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path"; - // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -135,14 +130,14 @@ public class PutHDFS extends AbstractHadoopProcessor { public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder() .name("Replication") .description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration") - .addValidator(createPositiveShortValidator()) + .addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR) .build(); public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder() .name("Permissions umask") .description( "A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode") - .addValidator(createUmaskValidator()) + .addValidator(HadoopValidators.UMASK_VALIDATOR) .build(); public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder() @@ -404,51 +399,4 @@ public class PutHDFS extends AbstractHadoopProcessor { } } - /* - * Validates that a property is a valid short number greater than 0. - */ - static Validator createPositiveShortValidator() { - return new Validator() { - @Override - public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - String reason = null; - try { - final short shortVal = Short.parseShort(value); - if (shortVal <= 0) { - reason = "short integer must be greater than zero"; - } - } catch (final NumberFormatException e) { - reason = "[" + value + "] is not a valid short integer"; - } - return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) - .build(); - } - }; - } - - /* - * Validates that a property is a valid umask, i.e. a short octal number that is not negative. - */ - static Validator createUmaskValidator() { - return new Validator() { - @Override - public ValidationResult validate(final String subject, final String value, final ValidationContext context) { - String reason = null; - try { - final short shortVal = Short.parseShort(value, 8); - if (shortVal < 0) { - reason = "octal umask [" + value + "] cannot be negative"; - } else if (shortVal > 511) { - // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked - reason = "octal umask [" + value + "] is not a valid umask"; - } - } catch (final NumberFormatException e) { - reason = "[" + value + "] is not a valid short octal number"; - } - return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) - .build(); - } - }; - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 1ac62af..bbb050a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -206,7 +206,7 @@ public class TestCreateHadoopSequenceFile { @Test public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException { - controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name()); + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.BZIP.name()); controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); File inFile = inFiles[0]; @@ -253,7 +253,7 @@ public class TestCreateHadoopSequenceFile { @Test public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException { - controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name()); + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.DEFAULT.name()); controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); File inFile = inFiles[0]; @@ -300,7 +300,7 @@ public class TestCreateHadoopSequenceFile { @Test public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException { - controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name()); + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, CompressionType.NONE.name()); controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); File inFile = inFiles[0]; http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml index 661180e..2c50ceb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml @@ -84,12 +84,6 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-hadoop-utils</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>com.github.stephenc.findbugs</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml new file mode 100644 index 0000000..dfe67a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/pom.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parquet-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-parquet-nar</artifactId> + <version>1.2.0-SNAPSHOT</version> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parquet-processors</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-libraries-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..958de4d --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,239 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..4c40843 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,105 @@ +nifi-parquet-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson ([email protected]) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Compress + The following NOTICE information applies: + Apache Commons Compress + Copyright 2002-2014 The Apache Software Foundation + + The files in the package org.apache.commons.compress.archivers.sevenz + were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/), + which has been placed in the public domain: + + "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html) + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Parquet + The following NOTICE information applies: + Apache Parquet MR (Incubating) + Copyright 2014 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta ([email protected]), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html + http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml new file mode 100644 index 0000000..c8c1078 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parquet-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-parquet-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>1.8.2</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-record-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/avro/user.avsc</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java new file mode 100644 index 0000000..3536944 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord; +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; +import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + +import java.io.IOException; + +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"parquet", "hadoop", "HDFS", "get", "ingest", "fetch", "source", "restricted"}) +@CapabilityDescription("Reads from a given Parquet file and writes records to the content of the flow file using " + + "the selected record writer. The original Parquet file will remain unchanged, and the content of the flow file " + + "will be replaced with records of the selected type. This processor can be used with ListHDFS or ListFile to obtain " + + "a listing of files to fetch.") +@WritesAttributes({ + @WritesAttribute(attribute="fetch.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added " + + "indicating why the file could not be fetched from the given filesystem."), + @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file") +}) +@SeeAlso({PutParquet.class}) +@Restricted("Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.") +public class FetchParquet extends AbstractFetchHDFSRecord { + + @Override + public HDFSRecordReader createHDFSRecordReader(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path) + throws IOException { + final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader.<GenericRecord>builder(path).withConf(conf); + return new AvroParquetHDFSRecordReader(readerBuilder.build()); + } + +}
