Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2686#discussion_r187357216
--- Diff:
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification",
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j)
model and the content of a FlowFile. "
+ + "The processor supports both classification and regression by
extracting the record from the FlowFile body and applying the model. "
+ + "The processor supports batch by allowing multiple records to be
passed in the FlowFile body with each record separated by the 'Record
Separator' property. "
+ + "Each record can contain multiple fields with each field separated
by the 'Field Separator' property."
+ )
+@WritesAttributes({
+ @WritesAttribute(attribute =
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description =
"Deeplearning4J error message"),
+ @WritesAttribute(attribute =
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description =
"Deeplearning4J output shape"),
+ })
+public class DeepLearning4JPredictor extends
AbstractDeepLearning4JProcessor {
+
+ static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("Successful DeepLearning4j results are routed to
this relationship").build();
+
+ static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("Failed DeepLearning4j results are routed to this
relationship").build();
+
+ protected Gson gson = new Gson();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+ static {
+ final Set<Relationship> tempRelationships = new HashSet<>();
+ tempRelationships.add(REL_SUCCESS);
+ tempRelationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(tempRelationships);
+ final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
+ tempDescriptors.add(MODEL_FILE);
+ tempDescriptors.add(RECORD_DIMENSIONS);
+ tempDescriptors.add(CHARSET);
+ tempDescriptors.add(FIELD_SEPARATOR);
+ tempDescriptors.add(RECORD_SEPARATOR);
+ propertyDescriptors =
Collections.unmodifiableList(tempDescriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+ if ( flowFile.getSize() == 0 ) {
+ String message = "FlowFile query is empty";
+ getLogger().error(message);
+ flowFile = session.putAttribute(flowFile,
DEEPLEARNING4J_ERROR_MESSAGE, message);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ String input = null;
+ try {
+ input = getFlowFileContents(session, charset, flowFile);
+ String fieldSeparator =
context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+ String recordSeparator =
context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+
+ int [] dimensions = getInputDimensions(context, charset,
flowFile, fieldSeparator);
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Received input {} with dimensions {}",
new Object[] { input, dimensions });
+ }
+
+ MultiLayerNetwork model = getModel(context);
+
+ long startTimeMillis = System.currentTimeMillis();
+
+ String [] inputRecords = input.split(recordSeparator);
+
+ List<INDArray> features = Arrays.stream(inputRecords).map(
+ record -> {
+ double [] parameters =
Arrays.stream(record.split(fieldSeparator)).mapToDouble(
+ field -> Double.parseDouble(field)).toArray();
+
+ INDArray featureInput = Nd4j.create(parameters,
dimensions);
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Features for record {}
parameters {} dims {} featureInput {} ",
+ new Object[] {record, parameters, dimensions,
featureInput});
+ }
+
+ return featureInput;
+
+ }).collect(Collectors.toList());
+
+ INDArray allFeatures = Nd4j.vstack(features);
+
+ INDArray results = model.output(allFeatures);
+
+ double [][] partitionedResults = new
double[inputRecords.length][];
+ for (int row = 0; row < inputRecords.length; row++) {
+ INDArray result = results.getRow(row);
+ partitionedResults[row] =
Nd4j.toFlattened(result).toDoubleVector();
+ }
+
+ String jsonResult = gson.toJson(partitionedResults);
+ int [] shape = results.shape();
+ String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1,
shape.length));
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Prediction for inputRecords {}, dims
{}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape
{}, jsonShape {}",
+ new Object[] {inputRecords, dimensions, results,
Arrays.toString(results.shape()), partitionedResults, jsonResult, shape,
jsonShape});
+ }
+
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(jsonResult.getBytes(charset))) {
+ flowFile = session.importFrom(bais, flowFile);
+ }
+
+ session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE,
jsonShape);
+
+ final long endTimeMillis = System.currentTimeMillis();
+
+ session.transfer(flowFile, REL_SUCCESS);
+
+ session.getProvenanceReporter().send(flowFile,
makeProvenanceUrl(context),
+ (endTimeMillis - startTimeMillis));
+ } catch (Exception exception) {
+ flowFile = populateErrorAttributes(session, flowFile,
exception.getMessage());
+ getLogger().error("Failed to process data due to {} for
input {}",
+ new Object[]{exception.getLocalizedMessage(),
input}, exception);
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
--- End diff --
I don't think context.yield() is appropriate here. There is no condition
that is occurring that would result in the processor unable to perform its task
for some period of time, after which it would work again. That's what
context.yield() is for. In this case, it is likely that the processor is
misconfigured or that the data is invalid, correct? So in that case, I would
just route to failure and continue on.
---