Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi/pull/379#discussion_r67958810
--- Diff:
nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java
---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.ReadType;
+import alluxio.client.file.FileInStream;
+import alluxio.client.file.URIStatus;
+import alluxio.client.file.options.OpenFileOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"alluxio", "tachyon", "get", "file"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("This processor will access the file using the
input URI provided and write the content of "
+ + "the remote file to the content of the incoming FlowFile.")
+public class GetAlluxio extends AbstractAlluxioProcessor {
+
+ public static final PropertyDescriptor READ_TYPE = new
PropertyDescriptor.Builder()
+ .name("alluxio-read-type")
+ .displayName("Read type")
+ .description("The Read Type to use when accessing the remote
file")
+ .defaultValue(ReadType.CACHE_PROMOTE.toString())
+ .required(true)
+ .allowableValues(ReadType.values())
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All files successfully retrieved are routed to
this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("In case of failure, flow files will be routed to
this relationship")
+ .autoTerminateDefault(true)
+ .build();
+ public static final Relationship REL_SUCCESS_REQ = new
Relationship.Builder()
+ .name("original")
+ .description("In case of success, the original FlowFile will
be routed to this relationship")
+ .autoTerminateDefault(true)
+ .build();
+
+ private final static Set<Relationship> relationships;
+
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(READ_TYPE);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_SUCCESS_REQ);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ createFileSystem(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile request = null;
+ if (context.hasIncomingConnection()) {
+ request = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (request == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final String uri =
context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
+ final AlluxioURI path = new AlluxioURI(uri);
+ final OpenFileOptions options =
OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
+
+ FileInStream in = null;
+ FlowFile flowFile = null;
+
+ if(request == null) {
+ flowFile = session.create(request);
+ } else {
+ flowFile = session.create();
+ }
+
+ try {
+ final URIStatus status = fileSystem.get().getStatus(path);
+ flowFile = updateFlowFile(status, flowFile, session);
+
+ in = fileSystem.get().openFile(path, options);
+ final FileInStream toCopy = in;
+
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ IOUtils.copy(toCopy, out);
--- End diff --
I'll have to scope this out again. After the fact I realized that it may
have just been the bulletin bug that was on master that was causing bulletins
to work. When I next get the opportunity, I'll see if that's still the case.
For some additional context, in the logs, which I now realize I failed to
capture, there was mention of the Alluxio logger. I was curious if that was
somehow interfering with NiFi's logger and preventing bulletins. I'll let you
know if I can recreate.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---