Github user alopresto commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1712#discussion_r114154289
--- Diff:
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base processor for reading a data from HDFS that can be fetched into
records.
+ */
+@TriggerWhenEmpty // trigger when empty so we have a chance to perform a
Kerberos re-login
+@DefaultSettings(yieldDuration = "100 ms") // decrease the default yield
since we are triggering when empty
+public abstract class AbstractFetchHDFSRecord extends
AbstractHadoopProcessor {
+
+ public static final PropertyDescriptor FILENAME = new
PropertyDescriptor.Builder()
+ .name("filename")
+ .displayName("Filename")
+ .description("The name of the file to retrieve")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("${path}/${filename}")
+
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("The service for writing records to the FlowFile
content")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles will be routed to this relationship
once they have been updated with the content of the file")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles will be routed to this relationship if
the content of the file cannot be retrieved and trying again will likely not be
helpful. "
+ + "This would occur, for instance, if the file is not
found or if there is a permissions issue")
+ .build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("FlowFiles will be routed to this relationship if
the content of the file cannot be retrieved, but might be able to be in the
future if tried again. "
+ + "This generally indicates that the Fetch should be
tried again.")
+ .build();
+
+ public static final String FETCH_FAILURE_REASON_ATTR =
"fetch.failure.reason";
+ public static final String RECORD_COUNT_ATTR = "record.count";
+
+ private volatile Set<Relationship> fetchHdfsRecordRelationships;
+ private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;
+
+ @Override
+ protected final void init(final ProcessorInitializationContext
context) {
+ super.init(context);
+
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_RETRY);
+ rels.add(REL_FAILURE);
+ this.fetchHdfsRecordRelationships =
Collections.unmodifiableSet(rels);
+
+ final List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(FILENAME);
+ props.add(RECORD_WRITER);
+ props.addAll(getAdditionalProperties());
+ this.fetchHdfsRecordProperties =
Collections.unmodifiableList(props);
+ }
+
+ /**
+ * Allows sub-classes to add additional properties, called from
initialize.
+ *
+ * @return additional properties to add to the overall list
+ */
+ public List<PropertyDescriptor> getAdditionalProperties() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final Set<Relationship> getRelationships() {
+ return fetchHdfsRecordRelationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return fetchHdfsRecordProperties;
+ }
+
+ /**
+ * Sub-classes provide the appropriate HDFSRecordReader.
+ *
+ * @param context the process context to obtain additional
configuration
+ * @param flowFile the flow file being written
+ * @param conf the Configuration instance
+ * @param path the path to write to
+ * @return the HDFSRecordWriter
+ * @throws IOException if an error occurs creating the writer
+ */
+ public abstract HDFSRecordReader createHDFSRecordReader(final
ProcessContext context, final FlowFile flowFile, final Configuration conf,
final Path path)
+ throws IOException;
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ // do this before getting a flow file so that we always get a
chance to attempt Kerberos relogin
+ final FileSystem fileSystem = getFileSystem();
+ final Configuration configuration = getConfiguration();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (configuration == null || fileSystem == null || ugi == null) {
+ getLogger().error("Processor not configured properly because
Configuration, FileSystem, or UserGroupInformation was null");
+ context.yield();
+ return;
+ }
+
+ final FlowFile originalFlowFile = session.get();
+ if (originalFlowFile == null ) {
+ context.yield();
+ return;
+ }
+
+
+ ugi.doAs((PrivilegedAction<Object>)() -> {
+ FlowFile child = null;
+ final String filenameValue =
context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
+ try {
+ final Path path = new Path(filenameValue);
+ final AtomicReference<Throwable> exceptionHolder = new
AtomicReference<>(null);
+ final AtomicReference<WriteResult> writeResult = new
AtomicReference<>();
+
+ final RecordSetWriterFactory recordSetWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordSetWriter recordSetWriter =
recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new
NullInputStream(0));
+
+ final StopWatch stopWatch = new StopWatch(true);
+
+ // use a child FlowFile so that if any error occurs we can
route the original untouched FlowFile to retry/failure
+ child = session.create(originalFlowFile);
+ child = session.write(child, (final OutputStream rawOut)
-> {
+ try (final BufferedOutputStream out = new
BufferedOutputStream(rawOut);
+ final HDFSRecordReader recordReader =
createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
+
+ final RecordSchema emptySchema = new
SimpleRecordSchema(Collections.emptyList());
+
+ final RecordSet recordSet = new RecordSet() {
+ @Override
+ public RecordSchema getSchema() throws
IOException {
+ return emptySchema;
+ }
+
+ @Override
+ public Record next() throws IOException {
+ return recordReader.nextRecord();
+ }
+ };
+
+ writeResult.set(recordSetWriter.write(recordSet,
out));
+ } catch (Exception e) {
+ exceptionHolder.set(e);
+ }
+ });
+
+ stopWatch.stop();
+
+ // if any errors happened within the session.write then
throw the exception so we jump
+ // into one of the appropriate catch blocks below
+ if (exceptionHolder.get() != null) {
+ throw exceptionHolder.get();
+ }
+
+ FlowFile successFlowFile = postProcess(context, session,
child, path);
+
+ final Map<String,String> attributes = new
HashMap<>(writeResult.get().getAttributes());
+ attributes.put(RECORD_COUNT_ATTR,
String.valueOf(writeResult.get().getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
recordSetWriter.getMimeType());
+ successFlowFile =
session.putAllAttributes(successFlowFile, attributes);
+
+ final URI uri = path.toUri();
+ getLogger().info("Successfully received content from {}
for {} in {}", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
--- End diff --
Add unit of `milliseconds` to the log output of the duration.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---