[ 
https://issues.apache.org/jira/browse/NIFI-3724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991088#comment-15991088
 ] 

ASF GitHub Bot commented on NIFI-3724:
--------------------------------------

Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1712#discussion_r114154289
  
    --- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
 ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import org.apache.commons.io.input.NullInputStream;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.AccessControlException;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.configuration.DefaultSettings;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.URI;
    +import java.security.PrivilegedAction;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Base processor for reading a data from HDFS that can be fetched into 
records.
    + */
    +@TriggerWhenEmpty // trigger when empty so we have a chance to perform a 
Kerberos re-login
    +@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield 
since we are triggering when empty
    +public abstract class AbstractFetchHDFSRecord extends 
AbstractHadoopProcessor {
    +
    +    public static final PropertyDescriptor FILENAME = new 
PropertyDescriptor.Builder()
    +            .name("filename")
    +            .displayName("Filename")
    +            .description("The name of the file to retrieve")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("${path}/${filename}")
    +            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("The service for writing records to the FlowFile 
content")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles will be routed to this relationship 
once they have been updated with the content of the file")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles will be routed to this relationship if 
the content of the file cannot be retrieved and trying again will likely not be 
helpful. "
    +                    + "This would occur, for instance, if the file is not 
found or if there is a permissions issue")
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles will be routed to this relationship if 
the content of the file cannot be retrieved, but might be able to be in the 
future if tried again. "
    +                    + "This generally indicates that the Fetch should be 
tried again.")
    +            .build();
    +
    +    public static final String FETCH_FAILURE_REASON_ATTR = 
"fetch.failure.reason";
    +    public static final String RECORD_COUNT_ATTR = "record.count";
    +
    +    private volatile Set<Relationship> fetchHdfsRecordRelationships;
    +    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
    +
    +    @Override
    +    protected final void init(final ProcessorInitializationContext 
context) {
    +        super.init(context);
    +
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_RETRY);
    +        rels.add(REL_FAILURE);
    +        this.fetchHdfsRecordRelationships = 
Collections.unmodifiableSet(rels);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FILENAME);
    +        props.add(RECORD_WRITER);
    +        props.addAll(getAdditionalProperties());
    +        this.fetchHdfsRecordProperties = 
Collections.unmodifiableList(props);
    +    }
    +
    +    /**
    +     * Allows sub-classes to add additional properties, called from 
initialize.
    +     *
    +     * @return additional properties to add to the overall list
    +     */
    +    public List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.emptyList();
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return fetchHdfsRecordRelationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return fetchHdfsRecordProperties;
    +    }
    +
    +    /**
    +     * Sub-classes provide the appropriate HDFSRecordReader.
    +     *
    +     * @param context the process context to obtain additional 
configuration
    +     * @param flowFile the flow file being written
    +     * @param conf the Configuration instance
    +     * @param path the path to write to
    +     * @return the HDFSRecordWriter
    +     * @throws IOException if an error occurs creating the writer
    +     */
    +    public abstract HDFSRecordReader createHDFSRecordReader(final 
ProcessContext context, final FlowFile flowFile, final Configuration conf, 
final Path path)
    +            throws IOException;
    +
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        // do this before getting a flow file so that we always get a 
chance to attempt Kerberos relogin
    +        final FileSystem fileSystem = getFileSystem();
    +        final Configuration configuration = getConfiguration();
    +        final UserGroupInformation ugi = getUserGroupInformation();
    +
    +        if (configuration == null || fileSystem == null || ugi == null) {
    +            getLogger().error("Processor not configured properly because 
Configuration, FileSystem, or UserGroupInformation was null");
    +            context.yield();
    +            return;
    +        }
    +
    +        final FlowFile originalFlowFile = session.get();
    +        if (originalFlowFile == null ) {
    +            context.yield();
    +            return;
    +        }
    +
    +
    +        ugi.doAs((PrivilegedAction<Object>)() -> {
    +            FlowFile child = null;
    +            final String filenameValue = 
context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
    +            try {
    +                final Path path = new Path(filenameValue);
    +                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
    +                final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
    +
    +                final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
    +                final RecordSetWriter recordSetWriter = 
recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new 
NullInputStream(0));
    +
    +                final StopWatch stopWatch = new StopWatch(true);
    +
    +                // use a child FlowFile so that if any error occurs we can 
route the original untouched FlowFile to retry/failure
    +                child = session.create(originalFlowFile);
    +                child = session.write(child, (final OutputStream rawOut) 
-> {
    +                    try (final BufferedOutputStream out = new 
BufferedOutputStream(rawOut);
    +                         final HDFSRecordReader recordReader = 
createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
    +
    +                        final RecordSchema emptySchema = new 
SimpleRecordSchema(Collections.emptyList());
    +
    +                        final RecordSet recordSet = new RecordSet() {
    +                            @Override
    +                            public RecordSchema getSchema() throws 
IOException {
    +                                return emptySchema;
    +                            }
    +
    +                            @Override
    +                            public Record next() throws IOException {
    +                                return recordReader.nextRecord();
    +                            }
    +                        };
    +
    +                        writeResult.set(recordSetWriter.write(recordSet, 
out));
    +                    } catch (Exception e) {
    +                        exceptionHolder.set(e);
    +                    }
    +                });
    +
    +                stopWatch.stop();
    +
    +                // if any errors happened within the session.write then 
throw the exception so we jump
    +                // into one of the appropriate catch blocks below
    +                if (exceptionHolder.get() != null) {
    +                    throw exceptionHolder.get();
    +                }
    +
    +                FlowFile successFlowFile = postProcess(context, session, 
child, path);
    +
    +                final Map<String,String> attributes = new 
HashMap<>(writeResult.get().getAttributes());
    +                attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(writeResult.get().getRecordCount()));
    +                attributes.put(CoreAttributes.MIME_TYPE.key(), 
recordSetWriter.getMimeType());
    +                successFlowFile = 
session.putAllAttributes(successFlowFile, attributes);
    +
    +                final URI uri = path.toUri();
    +                getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
    --- End diff --
    
    Add unit of `milliseconds` to the log output of the duration. 


> Add Put/Fetch Parquet Processors
> --------------------------------
>
>                 Key: NIFI-3724
>                 URL: https://issues.apache.org/jira/browse/NIFI-3724
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> Now that we have the record reader/writer services currently in master, it 
> would be nice to have reader and writers for Parquet. Since Parquet's API is 
> based on the Hadoop Path object, and not InputStreams/OutputStreams, we can't 
> really implement direct conversions to and from Parquet in the middle of a 
> flow, but we can we can perform the conversion by taking any record format 
> and writing to a Path as Parquet, or reading Parquet from a Path and writing 
> it out as another record format.
> We should add a PutParquet that uses a record reader and writes records to a 
> Path as Parquet, and a FetchParquet that reads Parquet from a path and writes 
> out records to a flow file using a record writer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to