[
https://issues.apache.org/jira/browse/NIFI-3724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991088#comment-15991088
]
ASF GitHub Bot commented on NIFI-3724:
--------------------------------------
Github user alopresto commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1712#discussion_r114154289
--- Diff:
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base processor for reading a data from HDFS that can be fetched into
records.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield
since we are triggering when empty
+public abstract class AbstractFetchHDFSRecord extends
AbstractHadoopProcessor {
+
+ public static final PropertyDescriptor FILENAME = new
PropertyDescriptor.Builder()
+ .name("filename")
+ .displayName("Filename")
+ .description("The name of the file to retrieve")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("${path}/${filename}")
+
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("The service for writing records to the FlowFile
content")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles will be routed to this relationship
once they have been updated with the content of the file")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles will be routed to this relationship if
the content of the file cannot be retrieved and trying again will likely not be
helpful. "
+ + "This would occur, for instance, if the file is not
found or if there is a permissions issue")
+ .build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("FlowFiles will be routed to this relationship if
the content of the file cannot be retrieved, but might be able to be in the
future if tried again. "
+ + "This generally indicates that the Fetch should be
tried again.")
+ .build();
+
+ public static final String FETCH_FAILURE_REASON_ATTR =
"fetch.failure.reason";
+ public static final String RECORD_COUNT_ATTR = "record.count";
+
+ private volatile Set<Relationship> fetchHdfsRecordRelationships;
+ private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
+
+ @Override
+ protected final void init(final ProcessorInitializationContext
context) {
+ super.init(context);
+
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_RETRY);
+ rels.add(REL_FAILURE);
+ this.fetchHdfsRecordRelationships =
Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FILENAME);
+ props.add(RECORD_WRITER);
+ props.addAll(getAdditionalProperties());
+ this.fetchHdfsRecordProperties =
Collections.unmodifiableList(props);
+ }
+
+ /**
+ * Allows sub-classes to add additional properties, called from
initialize.
+ *
+ * @return additional properties to add to the overall list
+ */
+ public List<PropertyDescriptor> getAdditionalProperties() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final Set<Relationship> getRelationships() {
+ return fetchHdfsRecordRelationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return fetchHdfsRecordProperties;
+ }
+
+ /**
+ * Sub-classes provide the appropriate HDFSRecordReader.
+ *
+ * @param context the process context to obtain additional
configuration
+ * @param flowFile the flow file being written
+ * @param conf the Configuration instance
+ * @param path the path to write to
+ * @return the HDFSRecordWriter
+ * @throws IOException if an error occurs creating the writer
+ */
+ public abstract HDFSRecordReader createHDFSRecordReader(final
ProcessContext context, final FlowFile flowFile, final Configuration conf,
final Path path)
+ throws IOException;
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ // do this before getting a flow file so that we always get a
chance to attempt Kerberos relogin
+ final FileSystem fileSystem = getFileSystem();
+ final Configuration configuration = getConfiguration();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (configuration == null || fileSystem == null || ugi == null) {
+ getLogger().error("Processor not configured properly because
Configuration, FileSystem, or UserGroupInformation was null");
+ context.yield();
+ return;
+ }
+
+ final FlowFile originalFlowFile = session.get();
+ if (originalFlowFile == null ) {
+ context.yield();
+ return;
+ }
+
+
+ ugi.doAs((PrivilegedAction<Object>)() -> {
+ FlowFile child = null;
+ final String filenameValue =
context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
+ try {
+ final Path path = new Path(filenameValue);
+ final AtomicReference<Throwable> exceptionHolder = new
AtomicReference<>(null);
+ final AtomicReference<WriteResult> writeResult = new
AtomicReference<>();
+
+ final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordSetWriter recordSetWriter =
recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new
NullInputStream(0));
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ // use a child FlowFile so that if any error occurs we can
route the original untouched FlowFile to retry/failure
+ child = session.create(originalFlowFile);
+ child = session.write(child, (final OutputStream rawOut)
-> {
+ try (final BufferedOutputStream out = new
BufferedOutputStream(rawOut);
+ final HDFSRecordReader recordReader =
createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
+
+ final RecordSchema emptySchema = new
SimpleRecordSchema(Collections.emptyList());
+
+ final RecordSet recordSet = new RecordSet() {
+ @Override
+ public RecordSchema getSchema() throws
IOException {
+ return emptySchema;
+ }
+
+ @Override
+ public Record next() throws IOException {
+ return recordReader.nextRecord();
+ }
+ };
+
+ writeResult.set(recordSetWriter.write(recordSet,
out));
+ } catch (Exception e) {
+ exceptionHolder.set(e);
+ }
+ });
+
+ stopWatch.stop();
+
+ // if any errors happened within the session.write then
throw the exception so we jump
+ // into one of the appropriate catch blocks below
+ if (exceptionHolder.get() != null) {
+ throw exceptionHolder.get();
+ }
+
+ FlowFile successFlowFile = postProcess(context, session,
child, path);
+
+ final Map<String,String> attributes = new
HashMap<>(writeResult.get().getAttributes());
+ attributes.put(RECORD_COUNT_ATTR,
String.valueOf(writeResult.get().getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ successFlowFile =
session.putAllAttributes(successFlowFile, attributes);
+
+ final URI uri = path.toUri();
+ getLogger().info("Successfully received content from {}
for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
--- End diff --
Add unit of `milliseconds` to the log output of the duration.
> Add Put/Fetch Parquet Processors
> --------------------------------
>
> Key: NIFI-3724
> URL: https://issues.apache.org/jira/browse/NIFI-3724
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 1.2.0
>
>
> Now that we have the record reader/writer services currently in master, it
> would be nice to have reader and writers for Parquet. Since Parquet's API is
> based on the Hadoop Path object, and not InputStreams/OutputStreams, we can't
> really implement direct conversions to and from Parquet in the middle of a
> flow, but we can we can perform the conversion by taking any record format
> and writing to a Path as Parquet, or reading Parquet from a Path and writing
> it out as another record format.
> We should add a PutParquet that uses a record reader and writes records to a
> Path as Parquet, and a FetchParquet that reads Parquet from a path and writes
> out records to a flow file using a record writer.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)