Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/379#discussion_r67956421
--- Diff:
nifi-nar-bundles/nifi-alluxio-bundle/nifi-alluxio-processors/src/main/java/org/apache/nifi/processors/alluxio/GetAlluxio.java
---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.alluxio;
+
+import alluxio.AlluxioURI;
+import alluxio.client.ReadType;
+import alluxio.client.file.FileInStream;
+import alluxio.client.file.URIStatus;
+import alluxio.client.file.options.OpenFileOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"alluxio", "tachyon", "get", "file"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("This processor will access the file using the
input URI provided and write the content of "
+ + "the remote file to the content of the incoming FlowFile.")
+public class GetAlluxio extends AbstractAlluxioProcessor {
+
+ public static final PropertyDescriptor READ_TYPE = new
PropertyDescriptor.Builder()
+ .name("alluxio-read-type")
+ .displayName("Read type")
+ .description("The Read Type to use when accessing the remote
file")
+ .defaultValue(ReadType.CACHE_PROMOTE.toString())
+ .required(true)
+ .allowableValues(ReadType.values())
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("All files successfully retrieved are routed to
this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("In case of failure, flow files will be routed to
this relationship")
+ .autoTerminateDefault(true)
+ .build();
+ public static final Relationship REL_SUCCESS_REQ = new
Relationship.Builder()
+ .name("original")
+ .description("In case of success, the original FlowFile will
be routed to this relationship")
+ .autoTerminateDefault(true)
+ .build();
+
+ private final static Set<Relationship> relationships;
+
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(READ_TYPE);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_SUCCESS_REQ);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ createFileSystem(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile request = null;
+ if (context.hasIncomingConnection()) {
+ request = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (request == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final String uri =
context.getProperty(URI).evaluateAttributeExpressions(request).getValue();
+ final AlluxioURI path = new AlluxioURI(uri);
+ final OpenFileOptions options =
OpenFileOptions.defaults().setReadType(ReadType.valueOf(context.getProperty(READ_TYPE).getValue()));
+
+ FileInStream in = null;
+ FlowFile flowFile = null;
+
+ if(request == null) {
+ flowFile = session.create(request);
+ } else {
+ flowFile = session.create();
+ }
+
+ try {
+ final URIStatus status = fileSystem.get().getStatus(path);
+ flowFile = updateFlowFile(status, flowFile, session);
+
+ in = fileSystem.get().openFile(path, options);
+ final FileInStream toCopy = in;
+
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ IOUtils.copy(toCopy, out);
--- End diff --
What do you mean/suggest?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---