This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 58905c2  MINIFICPP-809: Better support python capabilities and 
recordreader/writers
58905c2 is described below

commit 58905c29ab563af4c579eb6467d17b80984e5ea2
Author: Marc Parisi <[email protected]>
AuthorDate: Fri Apr 19 11:27:01 2019 -0400

    MINIFICPP-809: Better support python capabilities and recordreader/writers
    
    MINIFICPP-809: better support append
    
    MINIFICPP-809: Add google sentiment analyzer example
    
    MINIFICPP-809: Fix linter issues
    
    MINIFICPP-809: Resolving attribute issue seen during demo
    
    This closes #540.
    
    Signed-off-by: Marc Parisi <[email protected]>
---
 extensions/jni/ExecuteJavaControllerService.cpp    |   6 +-
 extensions/jni/ExecuteJavaProcessor.cpp            |   7 +-
 extensions/jni/jvm/JavaControllerService.h         |   9 ++
 extensions/jni/jvm/JniProcessSession.cpp           |  15 ++-
 extensions/jni/jvm/NarClassLoader.h                |  52 ++++++++
 extensions/jni/nifi-framework-jni/pom.xml          |   1 -
 .../org/apache/nifi/processor/JniClassLoader.java  | 136 ++++++++++++++++-----
 .../apache/nifi/processor/JniProcessSession.java   |  68 +++++++++--
 .../{ => examples}/SentimentAnalysis.py            |   0
 .../pythonprocessors/google/SentimentAnalyzer.py   |  64 ++++++++++
 .../rocksdb-repos/DatabaseContentRepository.cpp    |   3 +-
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +-
 .../script/python/ExecutePythonProcessor.cpp       |   3 +-
 extensions/script/python/PythonCreator.h           |  11 +-
 libminifi/include/core/StreamManager.h             |   2 +-
 .../include/core/repository/FileSystemRepository.h |   2 +-
 .../core/repository/VolatileContentRepository.h    |   2 +-
 libminifi/include/io/FileStream.h                  |   4 +-
 libminifi/src/core/ProcessSession.cpp              |  14 ++-
 .../src/core/repository/FileSystemRepository.cpp   |   4 +-
 .../core/repository/VolatileContentRepository.cpp  |   2 +-
 libminifi/src/io/FileStream.cpp                    |   7 +-
 22 files changed, 343 insertions(+), 71 deletions(-)

diff --git a/extensions/jni/ExecuteJavaControllerService.cpp 
b/extensions/jni/ExecuteJavaControllerService.cpp
index 50210ea..7ea1585 100644
--- a/extensions/jni/ExecuteJavaControllerService.cpp
+++ b/extensions/jni/ExecuteJavaControllerService.cpp
@@ -97,10 +97,12 @@ void ExecuteJavaControllerService::onEnable() {
 
   clazzInstance = java_servicer_->newInstance(class_name_);
 
-  auto onEnabledName = java_servicer_->getAnnotation(class_name_, "OnEnabled");
+  auto methods_with_signatures = java_servicer_->getAnnotations(class_name_, 
"OnEnabled");
   current_cs_class = java_servicer_->getObjectClass(class_name_, 
clazzInstance);
   try {
-    current_cs_class.callVoidMethod(env, clazzInstance, 
onEnabledName.first.c_str(), onEnabledName.second, contextInstance);
+    for (const auto &mwithsig : methods_with_signatures) {
+      current_cs_class.callVoidMethod(env, clazzInstance, mwithsig.first, 
mwithsig.second, contextInstance);
+    }
   } catch (std::runtime_error &re) {
     // this can be ignored.
   }
diff --git a/extensions/jni/ExecuteJavaProcessor.cpp 
b/extensions/jni/ExecuteJavaProcessor.cpp
index 12f7e39..3a7d154 100644
--- a/extensions/jni/ExecuteJavaProcessor.cpp
+++ b/extensions/jni/ExecuteJavaProcessor.cpp
@@ -139,7 +139,7 @@ void ExecuteJavaProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext
   // create provided class
 
   clazzInstance = java_servicer_->newInstance(class_name_);
-  auto onScheduledName = java_servicer_->getAnnotation(class_name_, 
"OnScheduled");
+  auto onScheduledNames = java_servicer_->getAnnotations(class_name_, 
"OnScheduled");
   current_processor_class = java_servicer_->getObjectClass(class_name_, 
clazzInstance);
   // attempt to schedule here
 
@@ -158,7 +158,10 @@ void ExecuteJavaProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext
   java_servicer_->setReference<minifi::jni::JniProcessContext>(env, 
context_instance_, &jpc);
 
   try {
-    current_processor_class.callVoidMethod(env, clazzInstance, 
onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
+
+    for (const auto &onScheduledName : onScheduledNames) {
+      current_processor_class.callVoidMethod(env, clazzInstance, 
onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
+    }
   } catch (std::runtime_error &re) {
     // this can be ignored.
   }
diff --git a/extensions/jni/jvm/JavaControllerService.h 
b/extensions/jni/jvm/JavaControllerService.h
index 9721440..4843b77 100644
--- a/extensions/jni/jvm/JavaControllerService.h
+++ b/extensions/jni/jvm/JavaControllerService.h
@@ -133,6 +133,15 @@ class JavaControllerService : public 
core::controller::ControllerService, public
   std::pair<std::string, std::string> getAnnotation(const std::string 
&requested_name, const std::string &method_name) {
     return nar_loader_->getAnnotation(requested_name, method_name);
   }
+
+  /**
+     * Retrieves a matching annotation
+     * @param requested class name,
+     * @param method_name method name to obtain
+     */
+    std::map<std::string, std::string> getAnnotations(const std::string 
&requested_name, const std::string &annotation_name) {
+      return nar_loader_->getAnnotations(requested_name, annotation_name);
+    }
   /**
    * creates a new instance
    * @param requested_name reqeusted class name
diff --git a/extensions/jni/jvm/JniProcessSession.cpp 
b/extensions/jni/jvm/JniProcessSession.cpp
index 9fd24f9..0baad8d 100644
--- a/extensions/jni/jvm/JniProcessSession.cpp
+++ b/extensions/jni/jvm/JniProcessSession.cpp
@@ -227,7 +227,14 @@ jobject 
Java_org_apache_nifi_processor_JniProcessSession_putAttribute(JNIEnv *en
     return nullptr;
   }
 
-  ptr->get()->addAttribute(JniStringToUTF(env, key), JniStringToUTF(env, 
value));
+  auto resKey = JniStringToUTF(env, key);
+  auto resValue = JniStringToUTF(env, value);
+
+  if (!ptr->get()->addAttribute(resKey, resValue)) {
+    if (resKey != "uuid") { // don't update the keyed attribute uuid
+      ptr->get()->updateAttribute(resKey, resValue);
+    }
+  }
 
   return ff;
 
@@ -445,8 +452,10 @@ jboolean 
Java_org_apache_nifi_processor_JniProcessSession_append(JNIEnv *env, jo
     jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
     jsize length = env->GetArrayLength(byteArray);
 
-    minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
-    session->getSession()->append(ptr->get(), &outStream);
+    if (length > 0) {
+      minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
+      session->getSession()->append(ptr->get(), &outStream);
+    }
 
     env->ReleaseByteArrayElements(byteArray, buffer, 0);
 
diff --git a/extensions/jni/jvm/NarClassLoader.h 
b/extensions/jni/jvm/NarClassLoader.h
index a647059..2260aba 100644
--- a/extensions/jni/jvm/NarClassLoader.h
+++ b/extensions/jni/jvm/NarClassLoader.h
@@ -120,6 +120,58 @@ class NarClassLoader {
 
     return std::make_pair(methodName, signature);
   }
+
+  std::map<std::string, std::string> getAnnotations(const std::string 
&requested_name, const std::string &method_name) {
+    auto env = java_servicer_->attach();
+    std::vector<std::string> method_names;
+    std::map<std::string, std::string> methods_with_signatures;
+    {
+      jmethodID mthd = env->GetMethodID(class_ref_.getReference(), 
"getMethods", "(Ljava/lang/String;Ljava/lang/String;)Ljava/util/List;");
+      if (mthd == nullptr) {
+        ThrowIf(env);
+      }
+
+      auto clazz_name = env->NewStringUTF(requested_name.c_str());
+      auto annotation_name = env->NewStringUTF(method_name.c_str());
+
+      jobject jList = env->CallObjectMethod(class_loader_, mthd, clazz_name, 
annotation_name);
+      ThrowIf(env);
+      jclass cList = env->FindClass("java/util/List");
+      jmethodID mSize = env->GetMethodID(cList, "size", "()I");
+      jmethodID mGet = env->GetMethodID(cList, "get", "(I)Ljava/lang/Object;");
+
+      // get the size of the list
+      jint size = env->CallIntMethod(jList, mSize);
+      ThrowIf(env);
+      // walk through and fill the vector
+      for (jint i = 0; i < size; i++) {
+        jstring strObj = (jstring) env->CallObjectMethod(jList, mGet, i);
+        ThrowIf(env);
+        method_names.push_back(JniStringToUTF(env, strObj));
+      }
+    }
+    for (const auto &method_name_str : method_names) {
+      jmethodID mthd = env->GetMethodID(class_ref_.getReference(), 
"getMethodSignature", 
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
+      if (mthd == nullptr) {
+        ThrowIf(env);
+      }
+
+      auto clazz_name = env->NewStringUTF(requested_name.c_str());
+      auto jstring_method_name = env->NewStringUTF(method_name_str.c_str());
+      auto annotation_name = env->NewStringUTF(method_name.c_str());
+
+      jstring obj = (jstring) env->CallObjectMethod(class_loader_, mthd, 
clazz_name, jstring_method_name, annotation_name);
+
+      ThrowIf(env);
+
+      if (obj) {
+        auto signature = JniStringToUTF(env, obj);
+        methods_with_signatures[method_name_str] = signature;
+      }
+    }
+
+    return methods_with_signatures;
+  }
   /**
    * Call empty constructor
    */
diff --git a/extensions/jni/nifi-framework-jni/pom.xml 
b/extensions/jni/nifi-framework-jni/pom.xml
index 310f045..57bd7c6 100644
--- a/extensions/jni/nifi-framework-jni/pom.xml
+++ b/extensions/jni/nifi-framework-jni/pom.xml
@@ -34,7 +34,6 @@
       <groupId>org.apache.nifi</groupId>
       <artifactId>nifi-api</artifactId>
       <version>${project.version}</version>
-
     </dependency>
     <dependency>
       <groupId>org.apache.nifi</groupId>
diff --git 
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
 
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
index cc6416a..abe81f2 100644
--- 
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
+++ 
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
@@ -49,7 +49,8 @@ public class JniClassLoader  {
 
     private static ConcurrentHashMap<String,Class<?>> classes = new 
ConcurrentHashMap<>();
 
-    private ConcurrentHashMap<Map.Entry<String,String>,Method> 
onScheduledMethod = new ConcurrentHashMap<>();
+    // avoiding external dependencies ( such as with Guava );
+    private ConcurrentHashMap<Map.Entry<String,String>,List<Method>> 
annotatedMethods = new ConcurrentHashMap<>();
 
     private ConcurrentHashMap<String,JniComponent> componentMap = new 
ConcurrentHashMap<>();
 
@@ -400,47 +401,90 @@ public class JniClassLoader  {
                     methods.add(method);
                 }
             }
-            if (methods.isEmpty())
-                klass = klass.getSuperclass();
-            else
-                break;
+            klass = klass.getSuperclass();
+
         }
         return methods;
     }
 
 
     public String getMethod(final String className, final String annotation){
-        Method mthd = onScheduledMethod.get(new 
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+        Collection<String> methods = getMethods(className,annotation);
+        if (!methods.isEmpty()){
+            return methods.iterator().next();
+        }
+        return null;
+    }
 
-        if (mthd == null)
+    public List<String> getMethods(final String className, final String 
annotation){
+        List<Method> mthds = annotatedMethods.get(new 
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+        List<String> methods = new ArrayList<>();
+        if (mthds != null)
         {
-            return null;
+            for(Method mthd : mthds){
+                methods.add(mthd.getName());
+            }
         }
 
-        return mthd.getName();
+        return methods;
     }
 
     public String getSignature(final String className, final String 
annotation){
-        Method mthd = onScheduledMethod.get(new 
AbstractMap.SimpleImmutableEntry<>(className,annotation));
-        if (mthd == null)
-        {
-            return null;
+        Map<String,String> signatureMap = getSignatures(className,annotation);
+        if (!signatureMap.isEmpty()){
+            return signatureMap.entrySet().iterator().next().getValue();
         }
-        String ret = "", argTypes="";
-        if (mthd.getReturnType().equals(Void.TYPE)){
-            ret = "V";
-        }
-        else{
-            ret = classToType(mthd.getReturnType());
-        }
-        argTypes = "(";
-        for(Class<?> type : mthd.getParameterTypes()){
-            argTypes += classToType(type);
+        return null;
+    }
+
+    public String getMethodSignature(final String className, String method, 
final String annotation){
+        Collection<Method> mthds = annotatedMethods.get(new 
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+        if (mthds != null)
+        {
+            for(Method mthd : mthds) {
+                if (mthd.getName().equals(method)) {
+                    String ret = "", argTypes = "";
+                    if (mthd.getReturnType().equals(Void.TYPE)) {
+                        ret = "V";
+                    } else {
+                        ret = classToType(mthd.getReturnType());
+                    }
+                    argTypes = "(";
+                    for (Class<?> type : mthd.getParameterTypes()) {
+                        argTypes += classToType(type);
+                    }
+
+                    argTypes += ")";
+                    return argTypes + ret;
+                }
+            }
         }
+        return null;
+    }
 
-        argTypes += ")";
+    public Map<String,String> getSignatures(final String className, final 
String annotation){
+        Collection<Method> mthds = annotatedMethods.get(new 
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+        Map<String,String> signatureMap = new HashMap<>();
+        if (mthds != null)
+        {
+            for(Method mthd : mthds) {
+                String ret = "", argTypes = "";
+                if (mthd.getReturnType().equals(Void.TYPE)) {
+                    ret = "V";
+                } else {
+                    ret = classToType(mthd.getReturnType());
+                }
+                argTypes = "(";
+                for (Class<?> type : mthd.getParameterTypes()) {
+                    argTypes += classToType(type);
+                }
+
+                argTypes += ")";
+                signatureMap.put(mthd.getName(),argTypes + ret);
+            }
+        }
+        return signatureMap;
 
-        return argTypes + ret;
     }
 
     private static String classToType(Class<?> type){
@@ -482,14 +526,40 @@ public class JniClassLoader  {
                 logger.warn("Could not find {}", className);
                 return null;
             } else {
-                List<Method> methods = getAnnotatedMethods(clazz, 
OnScheduled.class);
-                methods.stream().forEach(mthd -> onScheduledMethod.put(new 
AbstractMap.SimpleImmutableEntry<>(className, "OnScheduled"), mthd));
-
-                methods = getAnnotatedMethods(clazz, OnEnabled.class);
-                methods.stream().forEach(mthd -> onScheduledMethod.put(new 
AbstractMap.SimpleImmutableEntry<>(className, "OnEnabled"), mthd));
-
-                methods = getAnnotatedMethods(clazz, OnDisabled.class);
-                methods.stream().forEach(mthd -> onScheduledMethod.put(new 
AbstractMap.SimpleImmutableEntry<>(className, "OnDisabled"), mthd));
+                synchronized (annotatedMethods) {
+                    List<Method> methods = getAnnotatedMethods(clazz, 
OnScheduled.class);
+                    methods.stream().forEach(mthd -> {
+                        final Map.Entry<String, String> ent = new 
AbstractMap.SimpleImmutableEntry<>(className, "OnScheduled");
+                        List<Method> methodList = annotatedMethods.get(ent);
+                        if (methodList == null) {
+                            methodList = new ArrayList<>();
+                            annotatedMethods.put(ent, methodList);
+                        }
+                        methodList.add(mthd);
+                    });
+
+                    methods = getAnnotatedMethods(clazz, OnEnabled.class);
+                    methods.stream().forEach(mthd -> {
+                        final Map.Entry<String, String> ent = new 
AbstractMap.SimpleImmutableEntry<>(className, "OnEnabled");
+                        List<Method> methodList = annotatedMethods.get(ent);
+                        if (methodList == null) {
+                            methodList = new ArrayList<>();
+                            annotatedMethods.put(ent, methodList);
+                        }
+                        methodList.add(mthd);
+                    });
+
+                    methods = getAnnotatedMethods(clazz, OnDisabled.class);
+                    methods.stream().forEach(mthd -> {
+                        final Map.Entry<String, String> ent = new 
AbstractMap.SimpleImmutableEntry<>(className, "OnDisabled");
+                        List<Method> methodList = annotatedMethods.get(ent);
+                        if (methodList == null) {
+                            methodList = new ArrayList<>();
+                            annotatedMethods.put(ent, methodList);
+                        }
+                        methodList.add(mthd);
+                    });
+                }
             }
 
             return clazz.newInstance();
diff --git 
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
 
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
index f1a1385..b2f392b 100644
--- 
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
+++ 
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
@@ -1,6 +1,5 @@
 package org.apache.nifi.processor;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
@@ -15,7 +14,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.*;
 import java.util.regex.Pattern;
-import java.util.stream.IntStream;
 
 public class JniProcessSession implements ProcessSession {
 
@@ -41,7 +39,6 @@ public class JniProcessSession implements ProcessSession {
 
     @Override
     public void adjustCounter(String name, long delta, boolean immediate) {
-
     }
 
     @Override
@@ -173,20 +170,39 @@ public class JniProcessSession implements ProcessSession {
         }
     }
 
+    /**
+     * I don't like surrounding this with a Buffered Input Stream, but it 
seems that certain features expect this
+     * Case in point, CSV Reader:
+     *   createRecordReader(final Map<String, String> variables, final 
InputStream in, final ComponentLog logger)
+     *
+     * In this method we've erased the concrete type and are assuming the 
InputStream is a BufferedInputStream.
+     * While we can fix this, there is no guarantee that others don't abide by 
this. As a result we'll use
+     * BufferedInputStream here until we can safely move away.
+     */
     @Override
     public void read(FlowFile source, InputStreamCallback reader) throws 
FlowFileAccessException {
         try {
-            final JniInputStream input = readFlowFile(source);
+            final BufferedInputStream input = new BufferedInputStream( 
readFlowFile(source) );
             if (input != null)
                 reader.process(input);
         } catch (IOException e) {
+            e.printStackTrace();
             throw new FlowFileAccessException("Could not read from native 
source", e);
         }
     }
 
     @Override
     public InputStream read(FlowFile flowFile) {
-        return readFlowFile(flowFile);
+        /**
+         * I don't like surrounding this with a Buffered Input Stream, but it 
seems that certain features expect this
+         * Case in point, CSV Reader:
+         *   createRecordReader(final Map<String, String> variables, final 
InputStream in, final ComponentLog logger)
+         *
+         * In this method we've erased the concrete type and are assuming the 
InputStream is a BufferedInputStream.
+         * While we can fix this, there is no guarantee that others don't 
abide by this. As a result we'll use
+         * BufferedInputStream here until we can safely move away.
+         */
+        return new BufferedInputStream(readFlowFile(flowFile));
     }
 
     @Override
@@ -230,7 +246,7 @@ public class JniProcessSession implements ProcessSession {
             ByteArrayOutputStream bin = new ByteArrayOutputStream();
             @Override
             public void write(int b) throws IOException {
-                synchronized (bin) {
+                synchronized (this) {
                     bin.write(b);
                     // better suited to writing pages of memory
                     if (bin.size() > 4096) {
@@ -241,7 +257,16 @@ public class JniProcessSession implements ProcessSession {
 
             @Override
             public void flush() throws IOException {
-                synchronized (bin) {
+                synchronized (this) {
+                    // flush as an append.
+                    flushByterArray();
+                }
+            }
+
+
+            @Override
+            public void close() throws IOException {
+                synchronized (this) {
                     // flush as an append.
                     flushByterArray();
                 }
@@ -251,6 +276,7 @@ public class JniProcessSession implements ProcessSession {
                 append(source,bin.toByteArray());
                 bin = new ByteArrayOutputStream();
             }
+
         };
     }
 
@@ -283,12 +309,30 @@ public class JniProcessSession implements ProcessSession {
         return source;
     }
 
+    /**
+     * IOUtils was slow due to non-buffering. Underlying streams may buffer 
but this alleviates
+     * pressure with a small footprint.
+     * @param in input stream
+     * @param out output stream
+     * @throws IOException
+     */
+    private static void copyData(InputStream in, OutputStream out) throws 
IOException {
+        byte[] buffer = new byte[1 * 1024];
+        int len;
+        while ((len = in.read(buffer)) > 0) {
+            out.write(buffer, 0, len);
+        }
+    }
+
+
     @Override
     public FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile 
destination){
         try {
-            IOUtils.copy(Files.newInputStream(source),write(destination));
-            if (!keepSourceFile){
-                Files.delete(source);
+            try(OutputStream out = write(destination)) {
+                copyData(Files.newInputStream(source), out);
+                if (!keepSourceFile) {
+                    Files.delete(source);
+                }
             }
         } catch (IOException e) {
             return null;
@@ -299,7 +343,9 @@ public class JniProcessSession implements ProcessSession {
     @Override
     public FlowFile importFrom(InputStream source, FlowFile destination){
         try {
-            IOUtils.copy(source,write(destination));
+            try(OutputStream out = write(destination)) {
+                copyData(source, out);
+            }
         } catch (IOException e) {
             return null;
         }
diff --git a/extensions/pythonprocessors/SentimentAnalysis.py 
b/extensions/pythonprocessors/examples/SentimentAnalysis.py
similarity index 100%
rename from extensions/pythonprocessors/SentimentAnalysis.py
rename to extensions/pythonprocessors/examples/SentimentAnalysis.py
diff --git a/extensions/pythonprocessors/google/SentimentAnalyzer.py 
b/extensions/pythonprocessors/google/SentimentAnalyzer.py
new file mode 100644
index 0000000..3d54af0
--- /dev/null
+++ b/extensions/pythonprocessors/google/SentimentAnalyzer.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+  Install the following with pip ( or pip3 )
+  
+  pip install google-cloud-language
+  
+  -- the following were needed during development as we saw SSL timeout errors
+  pip install requests[security]
+  pip install -U httplib2
+"""
+import json
+import sys
+import codecs
+from google.cloud import language
+from google.cloud.language import enums
+from google.cloud.language import types
+
+def describe(processor):
+    processor.setDescription("Performs a sentiment Analysis of incoming 
flowfile content using Google Cloud.")
+
+def onInitialize(processor):
+    # is required, 
+    processor.addProperty("Credentials Path","Path to your Google Credentials 
JSON File. Must exist on agent hosts.","", True, False)
+
+class ContentExtract(object):
+  def __init__(self):
+    self.content = None
+
+  def process(self, input_stream):
+    self.content = codecs.getreader('utf-8')(input_stream).read()
+    return len(self.content)
+
+
+def onTrigger(context, session):
+  flow_file = session.get()
+  if flow_file is not None:
+    credentials_filename = context.getProperty("Credentials Path")
+    sentiment = ContentExtract()
+    session.read(flow_file,sentiment)
+    client = 
language.LanguageServiceClient.from_service_account_json(credentials_filename)
+    document = 
types.Document(content=sentiment.content,type=enums.Document.Type.PLAIN_TEXT)
+    
+    annotations = client.analyze_sentiment(document=document, retry = 
None,timeout=1.0 )
+    score = annotations.document_sentiment.score
+    magnitude = annotations.document_sentiment.magnitude
+    
+    flow_file.addAttribute("score",str(score))
+    flow_file.addAttribute("magnitude",str(magnitude))
+    session.transfer(flow_file, REL_SUCCESS)
+
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 5842bf2..bdcc3e5 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -61,11 +61,12 @@ void DatabaseContentRepository::stop() {
   }
 }
 
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
   // the traditional approach with these has been to return -1 from the 
stream; however, since we have the ability here
   // we can simply return a nullptr, which is also valid from the API when 
this stream is not valid.
   if (nullptr == claim || !is_valid_ || !db_)
     return nullptr;
+  // append is already supported in all modes
   return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, 
true);
 }
 
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 7e5705f..6d12460 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -86,7 +86,7 @@ class DatabaseContentRepository : public 
core::ContentRepository, public core::C
 
   virtual void stop();
 
-  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
 
   virtual std::shared_ptr<io::BaseStream> read(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
 
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp 
b/extensions/script/python/ExecutePythonProcessor.cpp
index 249ff09..66d8bf6 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -55,13 +55,12 @@ void ExecutePythonProcessor::initialize() {
   properties.insert(ScriptFile);
   properties.insert(ModuleDirectory);
   setSupportedProperties(properties);
-  setSupportedProperties(std::move(properties));
 
   std::set<core::Relationship> relationships;
   relationships.insert(Success);
   relationships.insert(Failure);
   setSupportedRelationships(std::move(relationships));
-
+  setAcceptAllProperties();
   if (!prop.empty()) {
     setProperty(ScriptFile, prop);
     std::shared_ptr<script::ScriptEngine> engine;
diff --git a/extensions/script/python/PythonCreator.h 
b/extensions/script/python/PythonCreator.h
index fb87592..0a320d0 100644
--- a/extensions/script/python/PythonCreator.h
+++ b/extensions/script/python/PythonCreator.h
@@ -93,11 +93,17 @@ class PythonCreator : public minifi::core::CoreComponent {
       core::ClassLoader::getDefaultClassLoader().registerResource("", 
"createPyProcFactory");
 
       for (const auto &path : classpaths_) {
-
         const auto &scriptName = getScriptName(path);
 
         utils::Identifier uuid;
-        auto processor = 
std::dynamic_pointer_cast<core::Processor>(core::ClassLoader::getDefaultClassLoader().instantiate(scriptName,
 uuid));
+
+        std::string loadName = scriptName;
+        const auto &package = getPackage(pathListings, path);
+
+        if (!package.empty())
+          loadName = "org.apache.nifi.minifi.processors." + package + "." + 
scriptName;
+
+        auto processor = 
std::dynamic_pointer_cast<core::Processor>(core::ClassLoader::getDefaultClassLoader().instantiate(loadName,
 uuid));
         if (processor) {
           try {
             processor->initialize();
@@ -112,6 +118,7 @@ class PythonCreator : public minifi::core::CoreComponent {
             details.artifact = getFileName(path);
             details.version = minifi::AgentBuild::VERSION;
             details.group = "python";
+
             minifi::ClassDescription description(script_with_package);
             description.dynamic_properties_ = 
proc->getPythonSupportDynamicProperties();
             auto properties = proc->getPythonProperties();
diff --git a/libminifi/include/core/StreamManager.h 
b/libminifi/include/core/StreamManager.h
index 5de9dbe..65e0414 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -48,7 +48,7 @@ class StreamManager {
    * @param streamId stream identifier
    * @return stream pointer.
    */
-  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> 
&streamId) = 0;
+  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> 
&streamId, bool append = false) = 0;
 
   /**
    * Create a read stream using the streamId as a reference.
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index 4a5ad5e..56a103c 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -49,7 +49,7 @@ class FileSystemRepository : public core::ContentRepository, 
public core::CoreCo
 
   bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
 
-  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
 
   virtual std::shared_ptr<io::BaseStream> read(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
 
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
index e203e96..d792329 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -77,7 +77,7 @@ class VolatileContentRepository : public 
core::ContentRepository, public virtual
    * @param claim resource claim
    * @return BaseStream shared pointer that represents the stream the consumer 
will write to.
    */
-  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append);
 
   /**
    * Creates readable stream.
diff --git a/libminifi/include/io/FileStream.h 
b/libminifi/include/io/FileStream.h
index 94d13b2..863f069 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -50,8 +50,10 @@ class FileStream : public io::BaseStream {
   /**
    * File Stream constructor that accepts an fstream shared pointer.
    * It must already be initialized for read and write.
+   * @param path path to file
+   * @param append identifies if this is an append or overwriting the file
    */
-  explicit FileStream(const std::string &path);
+  explicit FileStream(const std::string &path, bool append = false);
 
   virtual ~FileStream() {
     closeStream();
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index dcc9a78..40ee900 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -298,7 +298,6 @@ void ProcessSession::write(const 
std::shared_ptr<core::FlowFile> &flow, OutputSt
 
 void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, 
OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = nullptr;
-
   if (flow->getResourceClaim() == nullptr) {
     // No existed claim for append, we need to create new claim
     return write(flow, callback);
@@ -308,20 +307,23 @@ void ProcessSession::append(const 
std::shared_ptr<core::FlowFile> &flow, OutputS
 
   try {
     uint64_t startTime = getTimeMillis();
-    std::shared_ptr<io::BaseStream> stream = 
process_context_->getContentRepository()->write(claim);
+    std::shared_ptr<io::BaseStream> stream = 
process_context_->getContentRepository()->write(claim, true);
     if (nullptr == stream) {
       rollback();
       return;
     }
     // Call the callback to write the content
+
     size_t oldPos = stream->getSize();
-    stream->seek(oldPos + 1);
+    // this prevents an issue if we write, above, with zero length.
+    if (oldPos > 0)
+      stream->seek(oldPos + 1);
     if (callback->process(stream) < 0) {
       rollback();
       return;
     }
     uint64_t appendSize = stream->getSize() - oldPos;
-    flow->setSize(flow->getSize() + appendSize);
+    flow->setSize(stream->getSize());
 
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify 
flow record content " << flow->getUUIDStr();
@@ -342,6 +344,10 @@ void ProcessSession::read(const 
std::shared_ptr<core::FlowFile> &flow, InputStre
 
     if (flow->getResourceClaim() == nullptr) {
       // No existed claim for read, we throw exception
+      logger_->log_debug("For %s, no resource claim but size is %d", 
flow->getUUIDStr(), flow->getSize());
+      if (flow->getSize() == 0) {
+        return;
+      }
       throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for 
read");
     }
 
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp 
b/libminifi/src/core/repository/FileSystemRepository.cpp
index c0f1694..4607d74 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -42,8 +42,8 @@ bool FileSystemRepository::initialize(const 
std::shared_ptr<minifi::Configure> &
 void FileSystemRepository::stop() {
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim) {
-  return std::make_shared<io::FileStream>(claim->getContentFullPath());
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
+  return std::make_shared<io::FileStream>(claim->getContentFullPath(), append);
 }
 
 bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim> 
&streamId) {
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp 
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 674566b..9b5f9be 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -87,7 +87,7 @@ void VolatileContentRepository::start() {
   logger_->log_info("%s Repository Monitor Thread Start", getName());
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
   logger_->log_info("enter write for %s", claim->getContentFullPath());
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 20cf7a9..6ee0e95 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -28,12 +28,15 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-FileStream::FileStream(const std::string &path)
+FileStream::FileStream(const std::string &path, bool append)
     : logger_(logging::LoggerFactory<FileStream>::getLogger()),
       path_(path),
       offset_(0) {
   file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
-  file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
+  if (append)
+    file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out | 
std::fstream::app | std::fstream::binary);
+  else
+    file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
   file_stream_->seekg(0, file_stream_->end);
   file_stream_->seekp(0, file_stream_->end);
   int len = file_stream_->tellg();

Reply via email to