[ 
https://issues.apache.org/jira/browse/NIFI-3724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991319#comment-15991319
 ] 

ASF GitHub Bot commented on NIFI-3724:
--------------------------------------

Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114179324
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 ---
    @@ -0,0 +1,505 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.permission.FsPermission;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.exception.FailureException;
    +import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
    +import org.apache.nifi.schema.access.SchemaAccessStrategy;
    +import org.apache.nifi.schema.access.SchemaAccessUtils;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.schemaregistry.services.SchemaRegistry;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
    +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
    +import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
    +
    +/**
    + * Base class for processors that write Records to HDFS.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when empty
    +public abstract class AbstractPutHDFSRecord extends 
AbstractHadoopProcessor {
    +
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("compression-type")
    +            .displayName("Compression Type")
    +            .description("The type of compression for the file being 
written.")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERWRITE = new 
PropertyDescriptor.Builder()
    +            .name("overwrite")
    +            .displayName("Overwrite Files")
    +            .description("Whether or not to overwrite existing files in 
the same directory with the same name. When set to false, " +
    +                    "flow files will be routed to failure when a file 
exists in the same directory with the same name.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor UMASK = new 
PropertyDescriptor.Builder()
    +            .name("permissions-umask")
    +            .displayName("Permissions umask")
    +            .description("A umask represented as an octal number which 
determines the permissions of files written to HDFS. " +
    +                    "This overrides the Hadoop Configuration 
dfs.umaskmode")
    +            .addValidator(HadoopValidators.UMASK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder()
    +            .name("remote-owner")
    +            .displayName("Remote Owner")
    +            .description("Changes the owner of the HDFS file to this value 
after it is written. " +
    +                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change owner")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder()
    +            .name("remote-group")
    +            .displayName("Remote Group")
    +            .description("Changes the group of the HDFS file to this value 
after it is written. " +
    +                    "This only works if NiFi is running as a user that has 
HDFS super user privilege to change group")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("The service for reading records from incoming 
flow files.")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Flow Files that have been successfully processed 
are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("Flow Files that could not be processed due to 
issues that can be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("Flow Files that could not be processed due to 
issue that cannot be retried are transferred to this relationship")
    +            .build();
    +
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile String remoteOwner;
    +    private volatile String remoteGroup;
    +    private volatile SchemaAccessStrategy schemaAccessStrategy;
    +
    +    private volatile Set<Relationship> putHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> putHdfsRecordProperties;
    +
    +    private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
    +            SCHEMA_NAME_PROPERTY,
    +            SCHEMA_TEXT_PROPERTY,
    +            HWX_SCHEMA_REF_ATTRIBUTES,
    +            HWX_CONTENT_ENCODED_SCHEMA
    +    ));
    +
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.putHdfsRecordRelationships = 
Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(RECORD_READER);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(DIRECTORY)
    +                .description("The parent directory to which files should 
be written. Will be created if it doesn't exist.")
    +                .build());
    +
    +        final AllowableValue[] strategies = 
getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
    +                .description("Specifies how to obtain the schema that is 
to be used for writing the data.")
    +                .allowableValues(strategies)
    +                .defaultValue(getDefaultSchemaAccessStrategy().getValue())
    +                .build());
    +
    +        props.add(SCHEMA_REGISTRY);
    +        props.add(SCHEMA_NAME);
    +        props.add(SCHEMA_TEXT);
    +
    +        final AllowableValue[] compressionTypes = 
getCompressionTypes(context).toArray(new AllowableValue[0]);
    +
    +        props.add(new PropertyDescriptor.Builder()
    +                .fromPropertyDescriptor(COMPRESSION_TYPE)
    +                .allowableValues(compressionTypes)
    +                .defaultValue(getDefaultCompressionType(context))
    +                .build());
    +
    +        props.add(OVERWRITE);
    +        props.add(UMASK);
    +        props.add(REMOTE_GROUP);
    +        props.add(REMOTE_OWNER);
    +        props.addAll(getAdditionalProperties());
    +        this.putHdfsRecordProperties = Collections.unmodifiableList(props);
    +    }
    +
    +    protected List<AllowableValue> getSchemaAccessStrategyValues() {
    +        return strategyList;
    +    }
    +
    +    protected AllowableValue getDefaultSchemaAccessStrategy() {
    +        return SCHEMA_NAME_PROPERTY;
    +    }
    +
    +    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
    +        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +    }
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the possible compression types
    +     */
    +    public abstract List<AllowableValue> getCompressionTypes(final 
ProcessorInitializationContext context);
    +
    +    /**
    +     * @param context the initialization context
    +     * @return the default compression type
    +     */
    +    public abstract String getDefaultCompressionType(final 
ProcessorInitializationContext context);
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from 
initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return putHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +       return putHdfsRecordProperties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        final String schemaAccessStrategy = 
validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
    +        return 
SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, 
schemaAccessStrategy, getSchemaAccessStrategyValues());
    +    }
    +
    +    @OnScheduled
    +    public final void onScheduled(final ProcessContext context) throws 
IOException {
    +        super.abstractOnScheduled(context);
    +
    +        final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
    +
    +        final PropertyDescriptor descriptor = 
getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
    +        final String schemaAccess = 
context.getProperty(descriptor).getValue();
    +        this.schemaAccessStrategy = 
SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, 
context);
    +
    +        this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
    +        this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
    +
    +        // Set umask once, to avoid thread safety issues doing it in 
onTrigger
    +        final PropertyValue umaskProp = context.getProperty(UMASK);
    +        final short dfsUmask;
    +        if (umaskProp.isSet()) {
    +            dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
    +        } else {
    +            dfsUmask = FsPermission.DEFAULT_UMASK;
    +        }
    +        final Configuration conf = getConfiguration();
    +        FsPermission.setUMask(conf, new FsPermission(dfsUmask));
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordWriter.
    +     *
    +     * @param context the process context to obtain additional 
configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @param schema the schema for writing
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer or 
processing the schema
    +     */
    +    public abstract HDFSRecordWriter createHDFSRecordWriter(
    +            final ProcessContext context,
    +            final FlowFile flowFile,
    +            final Configuration conf,
    +            final Path path,
    +            final RecordSchema schema) throws IOException, 
SchemaNotFoundException;
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a 
chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because 
Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            Path tempDotCopyFile = null;
    +            FlowFile putFlowFile = flowFile;
    +            try {
    +                final String filenameValue = 
putFlowFile.getAttribute(CoreAttributes.FILENAME.key()); // TODO codec extension
    +                final String directoryValue = 
context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
    +
    +                // create the directory if it doesn't exist
    +                final Path directoryPath = new Path(directoryValue);
    +                createDirectory(fileSystem, directoryPath, remoteOwner, 
remoteGroup);
    +
    +                // write to tempFile first and on success rename to 
destFile
    +                final Path tempFile = new Path(directoryPath, "." + 
filenameValue);
    +                final Path destFile = new Path(directoryPath, 
filenameValue);
    +
    +                final boolean destinationExists = 
fileSystem.exists(destFile) || fileSystem.exists(tempFile);
    +                final boolean shouldOverwrite = 
context.getProperty(OVERWRITE).asBoolean();
    +
    +                // if the tempFile or destFile already exist, and 
overwrite is set to false, then transfer to failure
    +                if (destinationExists && !shouldOverwrite) {
    +                    session.transfer(session.penalize(putFlowFile), 
REL_FAILURE);
    +                    getLogger().warn("penalizing {} and routing to failure 
because file with same name already exists", new Object[]{putFlowFile});
    +                    return null;
    +                }
    +
    +                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
    +                final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
    +
    +                final FlowFile flowFileIn = putFlowFile;
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // Read records from the incoming FlowFile and write them 
the tempFile
    +                session.read(putFlowFile, (final InputStream rawIn) -> {
    +                    RecordReader recordReader = null;
    +                    HDFSRecordWriter recordWriter = null;
    +
    +                    try (final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
    +                        final RecordSchema destRecordSchema = 
schemaAccessStrategy.getSchema(flowFile, in);
    +                        recordWriter = createHDFSRecordWriter(context, 
flowFile, configuration, tempFile, destRecordSchema);
    +
    +                        // if we fail to create the RecordReader then we 
want to route to failure, so we need to
    +                        // handle this separately from the other 
IOExceptions which normally rout to retry
    +                        try {
    +                            recordReader = 
recordReaderFactory.createRecordReader(flowFileIn, in, getLogger());
    +                        } catch (Exception e) {
    +                            final RecordReaderFactoryException rrfe = new 
RecordReaderFactoryException("Unable to create RecordReader", e);
    +                            exceptionHolder.set(rrfe);
    +                            return;
    +                        }
    +
    +                        final RecordSet recordSet = 
recordReader.createRecordSet();
    +                        writeResult.set(recordWriter.write(recordSet));
    +
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    } finally {
    +                        IOUtils.closeQuietly(recordReader);
    +                        IOUtils.closeQuietly(recordWriter);
    +                    }
    +                });
    +                stopWatch.stop();
    +
    +                final String dataRate = 
stopWatch.calculateDataRate(putFlowFile.getSize());
    +                final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
    +                tempDotCopyFile = tempFile;
    +
    +                // if any errors happened within the session.read then 
throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                // Attempt to rename from the tempFile to destFile, and 
change owner if successfully renamed
    +                rename(fileSystem, tempFile, destFile);
    +                changeOwner(fileSystem, destFile, remoteOwner, 
remoteGroup);
    +
    +                getLogger().info("Wrote {} to {} in {} milliseconds at a 
rate of {}", new Object[]{putFlowFile, destFile, millis, dataRate});
    +
    +                putFlowFile = postProcess(context, session, putFlowFile, 
destFile);
    +
    +                final String outputPath = destFile.toString();
    +                final String newFilename = destFile.getName();
    +                final String hdfsPath = destFile.getParent().toString();
    +
    +                // Update the filename and absolute path attributes
    +                final Map<String,String> attributes = new 
HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(CoreAttributes.FILENAME.key(), newFilename);
    +                attributes.put(ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
    +                attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(writeResult.get().getRecordCount()));
    +                putFlowFile = session.putAllAttributes(putFlowFile, 
attributes);
    +
    +                // Send a provenance event and transfer to success
    +                final String transitUri = (outputPath.startsWith("/")) ? 
"hdfs:/" + outputPath : "hdfs://" + outputPath;
    +                session.getProvenanceReporter().send(putFlowFile, 
transitUri);
    +                session.transfer(putFlowFile, REL_SUCCESS);
    +
    +            } catch (IOException | FlowFileAccessException e) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new 
Object[]{e});
    +                session.transfer(session.penalize(putFlowFile), REL_RETRY);
    +                context.yield();
    +            } catch (Throwable t) {
    +                deleteQuietly(fileSystem, tempDotCopyFile);
    +                getLogger().error("Failed to write due to {}", new 
Object[]{t});
    +                session.transfer(putFlowFile, REL_FAILURE);
    +            }
    +
    +            return null;
    +        });
    +    }
    +
    +    /**
    +     * This method will be called after successfully writing to the 
destination file and renaming the file to it's final name
    +     * in order to give sub-classes a chance to take action before 
transferring to success.
    +     *
    +     * @param context the context
    +     * @param session the session
    +     * @param flowFile the flow file being processed
    +     * @param destFile the destination file written to
    +     * @return an updated FlowFile reference
    +     */
    +    protected FlowFile postProcess(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile, final Path destFile) {
    +        return flowFile;
    +    }
    +
    +    protected void rename(final FileSystem fileSystem, final Path srcFile, 
final Path destFile) throws IOException, InterruptedException, FailureException 
{
    +        boolean renamed = false;
    +        for (int i = 0; i < 10; i++) { // try to rename multiple times.
    +            if (fileSystem.rename(srcFile, destFile)) {
    +                renamed = true;
    +                break;// rename was successful
    +            }
    +            Thread.sleep(200L);// try waiting to let whatever might cause 
rename failure to resolve
    +        }
    +        if (!renamed) {
    --- End diff --
    
    I think a method Javadoc comment explaining that is fine. 


> Add Put/Fetch Parquet Processors
> --------------------------------
>
>                 Key: NIFI-3724
>                 URL: https://issues.apache.org/jira/browse/NIFI-3724
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> Now that we have the record reader/writer services currently in master, it 
> would be nice to have reader and writers for Parquet. Since Parquet's API is 
> based on the Hadoop Path object, and not InputStreams/OutputStreams, we can't 
> really implement direct conversions to and from Parquet in the middle of a 
> flow, but we can we can perform the conversion by taking any record format 
> and writing to a Path as Parquet, or reading Parquet from a Path and writing 
> it out as another record format.
> We should add a PutParquet that uses a record reader and writes records to a 
> Path as Parquet, and a FetchParquet that reads Parquet from a path and writes 
> out records to a flow file using a record writer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to