[
https://issues.apache.org/jira/browse/NIFI-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470503#comment-16470503
]
ASF GitHub Bot commented on NIFI-5166:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2686#discussion_r187353741
--- Diff:
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification",
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j)
model and the content of a FlowFile. "
+ + "The processor supports both classification and regression by
extracting the record from the FlowFile body and applying the model. "
+ + "The processor supports batch by allowing multiple records to be
passed in the FlowFile body with each record separated by the 'Record
Separator' property. "
+ + "Each record can contain multiple fields with each field separated
by the 'Field Separator' property."
+ )
+@WritesAttributes({
+ @WritesAttribute(attribute =
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description =
"Deeplearning4J error message"),
+ @WritesAttribute(attribute =
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description =
"Deeplearning4J output shape"),
+ })
+public class DeepLearning4JPredictor extends
AbstractDeepLearning4JProcessor {
+
+ static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
+ .description("Successful DeepLearning4j results are routed to
this relationship").build();
+
+ static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
+ .description("Failed DeepLearning4j results are routed to this
relationship").build();
+
+ protected Gson gson = new Gson();
+
+ private static final Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> propertyDescriptors;
+ static {
+ final Set<Relationship> tempRelationships = new HashSet<>();
+ tempRelationships.add(REL_SUCCESS);
+ tempRelationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(tempRelationships);
+ final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
+ tempDescriptors.add(MODEL_FILE);
+ tempDescriptors.add(RECORD_DIMENSIONS);
+ tempDescriptors.add(CHARSET);
+ tempDescriptors.add(FIELD_SEPARATOR);
+ tempDescriptors.add(RECORD_SEPARATOR);
+ propertyDescriptors =
Collections.unmodifiableList(tempDescriptors);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+ if ( flowFile.getSize() == 0 ) {
+ String message = "FlowFile query is empty";
+ getLogger().error(message);
+ flowFile = session.putAttribute(flowFile,
DEEPLEARNING4J_ERROR_MESSAGE, message);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ String input = null;
+ try {
+ input = getFlowFileContents(session, charset, flowFile);
+ String fieldSeparator =
context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+ String recordSeparator =
context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+
+ int [] dimensions = getInputDimensions(context, charset,
flowFile, fieldSeparator);
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Received input {} with dimensions {}",
new Object[] { input, dimensions });
+ }
+
+ MultiLayerNetwork model = getModel(context);
+
+ long startTimeMillis = System.currentTimeMillis();
+
+ String [] inputRecords = input.split(recordSeparator);
+
+ List<INDArray> features = Arrays.stream(inputRecords).map(
+ record -> {
+ double [] parameters =
Arrays.stream(record.split(fieldSeparator)).mapToDouble(
+ field -> Double.parseDouble(field)).toArray();
+
+ INDArray featureInput = Nd4j.create(parameters,
dimensions);
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Features for record {}
parameters {} dims {} featureInput {} ",
+ new Object[] {record, parameters, dimensions,
featureInput});
+ }
+
+ return featureInput;
+
+ }).collect(Collectors.toList());
+
+ INDArray allFeatures = Nd4j.vstack(features);
+
+ INDArray results = model.output(allFeatures);
+
+ double [][] partitionedResults = new
double[inputRecords.length][];
+ for (int row = 0; row < inputRecords.length; row++) {
+ INDArray result = results.getRow(row);
+ partitionedResults[row] =
Nd4j.toFlattened(result).toDoubleVector();
+ }
+
+ String jsonResult = gson.toJson(partitionedResults);
+ int [] shape = results.shape();
+ String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1,
shape.length));
+
+ if ( getLogger().isDebugEnabled() ) {
+ getLogger().debug("Prediction for inputRecords {}, dims
{}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape
{}, jsonShape {}",
+ new Object[] {inputRecords, dimensions, results,
Arrays.toString(results.shape()), partitionedResults, jsonResult, shape,
jsonShape});
+ }
+
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(jsonResult.getBytes(charset))) {
+ flowFile = session.importFrom(bais, flowFile);
+ }
+
+ session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE,
jsonShape);
+
+ final long endTimeMillis = System.currentTimeMillis();
+
+ session.transfer(flowFile, REL_SUCCESS);
+
+ session.getProvenanceReporter().send(flowFile,
makeProvenanceUrl(context),
--- End diff --
I don't believe a SEND event is appropriate here. We are not sending the
data to some external location. I think we want a CONTENT_MODIFIED event. It
probably does make sense, though, to include the time duration and if wanting
to include the name of the Model File, that could be done in the event details.
> Create deep learning classification and regression processor
> ------------------------------------------------------------
>
> Key: NIFI-5166
> URL: https://issues.apache.org/jira/browse/NIFI-5166
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: Mans Singh
> Assignee: Mans Singh
> Priority: Minor
> Labels: Learning, classification,, deep, regression,
> Fix For: 1.7.0
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> We need a deep learning classification and regression processor.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)