This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 58905c2 MINIFICPP-809: Better support python capabilities and
recordreader/writers
58905c2 is described below
commit 58905c29ab563af4c579eb6467d17b80984e5ea2
Author: Marc Parisi <[email protected]>
AuthorDate: Fri Apr 19 11:27:01 2019 -0400
MINIFICPP-809: Better support python capabilities and recordreader/writers
MINIFICPP-809: better support append
MINIFICPP-809: Add google sentiment analyzer example
MINIFICPP-809: Fix linter issues
MINIFICPP-809: Resolving attribute issue seen during demo
This closes #540.
Signed-off-by: Marc Parisi <[email protected]>
---
extensions/jni/ExecuteJavaControllerService.cpp | 6 +-
extensions/jni/ExecuteJavaProcessor.cpp | 7 +-
extensions/jni/jvm/JavaControllerService.h | 9 ++
extensions/jni/jvm/JniProcessSession.cpp | 15 ++-
extensions/jni/jvm/NarClassLoader.h | 52 ++++++++
extensions/jni/nifi-framework-jni/pom.xml | 1 -
.../org/apache/nifi/processor/JniClassLoader.java | 136 ++++++++++++++++-----
.../apache/nifi/processor/JniProcessSession.java | 68 +++++++++--
.../{ => examples}/SentimentAnalysis.py | 0
.../pythonprocessors/google/SentimentAnalyzer.py | 64 ++++++++++
.../rocksdb-repos/DatabaseContentRepository.cpp | 3 +-
.../rocksdb-repos/DatabaseContentRepository.h | 2 +-
.../script/python/ExecutePythonProcessor.cpp | 3 +-
extensions/script/python/PythonCreator.h | 11 +-
libminifi/include/core/StreamManager.h | 2 +-
.../include/core/repository/FileSystemRepository.h | 2 +-
.../core/repository/VolatileContentRepository.h | 2 +-
libminifi/include/io/FileStream.h | 4 +-
libminifi/src/core/ProcessSession.cpp | 14 ++-
.../src/core/repository/FileSystemRepository.cpp | 4 +-
.../core/repository/VolatileContentRepository.cpp | 2 +-
libminifi/src/io/FileStream.cpp | 7 +-
22 files changed, 343 insertions(+), 71 deletions(-)
diff --git a/extensions/jni/ExecuteJavaControllerService.cpp
b/extensions/jni/ExecuteJavaControllerService.cpp
index 50210ea..7ea1585 100644
--- a/extensions/jni/ExecuteJavaControllerService.cpp
+++ b/extensions/jni/ExecuteJavaControllerService.cpp
@@ -97,10 +97,12 @@ void ExecuteJavaControllerService::onEnable() {
clazzInstance = java_servicer_->newInstance(class_name_);
- auto onEnabledName = java_servicer_->getAnnotation(class_name_, "OnEnabled");
+ auto methods_with_signatures = java_servicer_->getAnnotations(class_name_,
"OnEnabled");
current_cs_class = java_servicer_->getObjectClass(class_name_,
clazzInstance);
try {
- current_cs_class.callVoidMethod(env, clazzInstance,
onEnabledName.first.c_str(), onEnabledName.second, contextInstance);
+ for (const auto &mwithsig : methods_with_signatures) {
+ current_cs_class.callVoidMethod(env, clazzInstance, mwithsig.first,
mwithsig.second, contextInstance);
+ }
} catch (std::runtime_error &re) {
// this can be ignored.
}
diff --git a/extensions/jni/ExecuteJavaProcessor.cpp
b/extensions/jni/ExecuteJavaProcessor.cpp
index 12f7e39..3a7d154 100644
--- a/extensions/jni/ExecuteJavaProcessor.cpp
+++ b/extensions/jni/ExecuteJavaProcessor.cpp
@@ -139,7 +139,7 @@ void ExecuteJavaProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext
// create provided class
clazzInstance = java_servicer_->newInstance(class_name_);
- auto onScheduledName = java_servicer_->getAnnotation(class_name_,
"OnScheduled");
+ auto onScheduledNames = java_servicer_->getAnnotations(class_name_,
"OnScheduled");
current_processor_class = java_servicer_->getObjectClass(class_name_,
clazzInstance);
// attempt to schedule here
@@ -158,7 +158,10 @@ void ExecuteJavaProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext
java_servicer_->setReference<minifi::jni::JniProcessContext>(env,
context_instance_, &jpc);
try {
- current_processor_class.callVoidMethod(env, clazzInstance,
onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
+
+ for (const auto &onScheduledName : onScheduledNames) {
+ current_processor_class.callVoidMethod(env, clazzInstance,
onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
+ }
} catch (std::runtime_error &re) {
// this can be ignored.
}
diff --git a/extensions/jni/jvm/JavaControllerService.h
b/extensions/jni/jvm/JavaControllerService.h
index 9721440..4843b77 100644
--- a/extensions/jni/jvm/JavaControllerService.h
+++ b/extensions/jni/jvm/JavaControllerService.h
@@ -133,6 +133,15 @@ class JavaControllerService : public
core::controller::ControllerService, public
std::pair<std::string, std::string> getAnnotation(const std::string
&requested_name, const std::string &method_name) {
return nar_loader_->getAnnotation(requested_name, method_name);
}
+
+ /**
+ * Retrieves a matching annotation
+ * @param requested class name,
+ * @param method_name method name to obtain
+ */
+ std::map<std::string, std::string> getAnnotations(const std::string
&requested_name, const std::string &annotation_name) {
+ return nar_loader_->getAnnotations(requested_name, annotation_name);
+ }
/**
* creates a new instance
* @param requested_name reqeusted class name
diff --git a/extensions/jni/jvm/JniProcessSession.cpp
b/extensions/jni/jvm/JniProcessSession.cpp
index 9fd24f9..0baad8d 100644
--- a/extensions/jni/jvm/JniProcessSession.cpp
+++ b/extensions/jni/jvm/JniProcessSession.cpp
@@ -227,7 +227,14 @@ jobject
Java_org_apache_nifi_processor_JniProcessSession_putAttribute(JNIEnv *en
return nullptr;
}
- ptr->get()->addAttribute(JniStringToUTF(env, key), JniStringToUTF(env,
value));
+ auto resKey = JniStringToUTF(env, key);
+ auto resValue = JniStringToUTF(env, value);
+
+ if (!ptr->get()->addAttribute(resKey, resValue)) {
+ if (resKey != "uuid") { // don't update the keyed attribute uuid
+ ptr->get()->updateAttribute(resKey, resValue);
+ }
+ }
return ff;
@@ -445,8 +452,10 @@ jboolean
Java_org_apache_nifi_processor_JniProcessSession_append(JNIEnv *env, jo
jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
jsize length = env->GetArrayLength(byteArray);
- minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
- session->getSession()->append(ptr->get(), &outStream);
+ if (length > 0) {
+ minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
+ session->getSession()->append(ptr->get(), &outStream);
+ }
env->ReleaseByteArrayElements(byteArray, buffer, 0);
diff --git a/extensions/jni/jvm/NarClassLoader.h
b/extensions/jni/jvm/NarClassLoader.h
index a647059..2260aba 100644
--- a/extensions/jni/jvm/NarClassLoader.h
+++ b/extensions/jni/jvm/NarClassLoader.h
@@ -120,6 +120,58 @@ class NarClassLoader {
return std::make_pair(methodName, signature);
}
+
+ std::map<std::string, std::string> getAnnotations(const std::string
&requested_name, const std::string &method_name) {
+ auto env = java_servicer_->attach();
+ std::vector<std::string> method_names;
+ std::map<std::string, std::string> methods_with_signatures;
+ {
+ jmethodID mthd = env->GetMethodID(class_ref_.getReference(),
"getMethods", "(Ljava/lang/String;Ljava/lang/String;)Ljava/util/List;");
+ if (mthd == nullptr) {
+ ThrowIf(env);
+ }
+
+ auto clazz_name = env->NewStringUTF(requested_name.c_str());
+ auto annotation_name = env->NewStringUTF(method_name.c_str());
+
+ jobject jList = env->CallObjectMethod(class_loader_, mthd, clazz_name,
annotation_name);
+ ThrowIf(env);
+ jclass cList = env->FindClass("java/util/List");
+ jmethodID mSize = env->GetMethodID(cList, "size", "()I");
+ jmethodID mGet = env->GetMethodID(cList, "get", "(I)Ljava/lang/Object;");
+
+ // get the size of the list
+ jint size = env->CallIntMethod(jList, mSize);
+ ThrowIf(env);
+ // walk through and fill the vector
+ for (jint i = 0; i < size; i++) {
+ jstring strObj = (jstring) env->CallObjectMethod(jList, mGet, i);
+ ThrowIf(env);
+ method_names.push_back(JniStringToUTF(env, strObj));
+ }
+ }
+ for (const auto &method_name_str : method_names) {
+ jmethodID mthd = env->GetMethodID(class_ref_.getReference(),
"getMethodSignature",
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
+ if (mthd == nullptr) {
+ ThrowIf(env);
+ }
+
+ auto clazz_name = env->NewStringUTF(requested_name.c_str());
+ auto jstring_method_name = env->NewStringUTF(method_name_str.c_str());
+ auto annotation_name = env->NewStringUTF(method_name.c_str());
+
+ jstring obj = (jstring) env->CallObjectMethod(class_loader_, mthd,
clazz_name, jstring_method_name, annotation_name);
+
+ ThrowIf(env);
+
+ if (obj) {
+ auto signature = JniStringToUTF(env, obj);
+ methods_with_signatures[method_name_str] = signature;
+ }
+ }
+
+ return methods_with_signatures;
+ }
/**
* Call empty constructor
*/
diff --git a/extensions/jni/nifi-framework-jni/pom.xml
b/extensions/jni/nifi-framework-jni/pom.xml
index 310f045..57bd7c6 100644
--- a/extensions/jni/nifi-framework-jni/pom.xml
+++ b/extensions/jni/nifi-framework-jni/pom.xml
@@ -34,7 +34,6 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${project.version}</version>
-
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
index cc6416a..abe81f2 100644
---
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
+++
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniClassLoader.java
@@ -49,7 +49,8 @@ public class JniClassLoader {
private static ConcurrentHashMap<String,Class<?>> classes = new
ConcurrentHashMap<>();
- private ConcurrentHashMap<Map.Entry<String,String>,Method>
onScheduledMethod = new ConcurrentHashMap<>();
+ // avoiding external dependencies ( such as with Guava );
+ private ConcurrentHashMap<Map.Entry<String,String>,List<Method>>
annotatedMethods = new ConcurrentHashMap<>();
private ConcurrentHashMap<String,JniComponent> componentMap = new
ConcurrentHashMap<>();
@@ -400,47 +401,90 @@ public class JniClassLoader {
methods.add(method);
}
}
- if (methods.isEmpty())
- klass = klass.getSuperclass();
- else
- break;
+ klass = klass.getSuperclass();
+
}
return methods;
}
public String getMethod(final String className, final String annotation){
- Method mthd = onScheduledMethod.get(new
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+ Collection<String> methods = getMethods(className,annotation);
+ if (!methods.isEmpty()){
+ return methods.iterator().next();
+ }
+ return null;
+ }
- if (mthd == null)
+ public List<String> getMethods(final String className, final String
annotation){
+ List<Method> mthds = annotatedMethods.get(new
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+ List<String> methods = new ArrayList<>();
+ if (mthds != null)
{
- return null;
+ for(Method mthd : mthds){
+ methods.add(mthd.getName());
+ }
}
- return mthd.getName();
+ return methods;
}
public String getSignature(final String className, final String
annotation){
- Method mthd = onScheduledMethod.get(new
AbstractMap.SimpleImmutableEntry<>(className,annotation));
- if (mthd == null)
- {
- return null;
+ Map<String,String> signatureMap = getSignatures(className,annotation);
+ if (!signatureMap.isEmpty()){
+ return signatureMap.entrySet().iterator().next().getValue();
}
- String ret = "", argTypes="";
- if (mthd.getReturnType().equals(Void.TYPE)){
- ret = "V";
- }
- else{
- ret = classToType(mthd.getReturnType());
- }
- argTypes = "(";
- for(Class<?> type : mthd.getParameterTypes()){
- argTypes += classToType(type);
+ return null;
+ }
+
+ public String getMethodSignature(final String className, String method,
final String annotation){
+ Collection<Method> mthds = annotatedMethods.get(new
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+ if (mthds != null)
+ {
+ for(Method mthd : mthds) {
+ if (mthd.getName().equals(method)) {
+ String ret = "", argTypes = "";
+ if (mthd.getReturnType().equals(Void.TYPE)) {
+ ret = "V";
+ } else {
+ ret = classToType(mthd.getReturnType());
+ }
+ argTypes = "(";
+ for (Class<?> type : mthd.getParameterTypes()) {
+ argTypes += classToType(type);
+ }
+
+ argTypes += ")";
+ return argTypes + ret;
+ }
+ }
}
+ return null;
+ }
- argTypes += ")";
+ public Map<String,String> getSignatures(final String className, final
String annotation){
+ Collection<Method> mthds = annotatedMethods.get(new
AbstractMap.SimpleImmutableEntry<>(className,annotation));
+ Map<String,String> signatureMap = new HashMap<>();
+ if (mthds != null)
+ {
+ for(Method mthd : mthds) {
+ String ret = "", argTypes = "";
+ if (mthd.getReturnType().equals(Void.TYPE)) {
+ ret = "V";
+ } else {
+ ret = classToType(mthd.getReturnType());
+ }
+ argTypes = "(";
+ for (Class<?> type : mthd.getParameterTypes()) {
+ argTypes += classToType(type);
+ }
+
+ argTypes += ")";
+ signatureMap.put(mthd.getName(),argTypes + ret);
+ }
+ }
+ return signatureMap;
- return argTypes + ret;
}
private static String classToType(Class<?> type){
@@ -482,14 +526,40 @@ public class JniClassLoader {
logger.warn("Could not find {}", className);
return null;
} else {
- List<Method> methods = getAnnotatedMethods(clazz,
OnScheduled.class);
- methods.stream().forEach(mthd -> onScheduledMethod.put(new
AbstractMap.SimpleImmutableEntry<>(className, "OnScheduled"), mthd));
-
- methods = getAnnotatedMethods(clazz, OnEnabled.class);
- methods.stream().forEach(mthd -> onScheduledMethod.put(new
AbstractMap.SimpleImmutableEntry<>(className, "OnEnabled"), mthd));
-
- methods = getAnnotatedMethods(clazz, OnDisabled.class);
- methods.stream().forEach(mthd -> onScheduledMethod.put(new
AbstractMap.SimpleImmutableEntry<>(className, "OnDisabled"), mthd));
+ synchronized (annotatedMethods) {
+ List<Method> methods = getAnnotatedMethods(clazz,
OnScheduled.class);
+ methods.stream().forEach(mthd -> {
+ final Map.Entry<String, String> ent = new
AbstractMap.SimpleImmutableEntry<>(className, "OnScheduled");
+ List<Method> methodList = annotatedMethods.get(ent);
+ if (methodList == null) {
+ methodList = new ArrayList<>();
+ annotatedMethods.put(ent, methodList);
+ }
+ methodList.add(mthd);
+ });
+
+ methods = getAnnotatedMethods(clazz, OnEnabled.class);
+ methods.stream().forEach(mthd -> {
+ final Map.Entry<String, String> ent = new
AbstractMap.SimpleImmutableEntry<>(className, "OnEnabled");
+ List<Method> methodList = annotatedMethods.get(ent);
+ if (methodList == null) {
+ methodList = new ArrayList<>();
+ annotatedMethods.put(ent, methodList);
+ }
+ methodList.add(mthd);
+ });
+
+ methods = getAnnotatedMethods(clazz, OnDisabled.class);
+ methods.stream().forEach(mthd -> {
+ final Map.Entry<String, String> ent = new
AbstractMap.SimpleImmutableEntry<>(className, "OnDisabled");
+ List<Method> methodList = annotatedMethods.get(ent);
+ if (methodList == null) {
+ methodList = new ArrayList<>();
+ annotatedMethods.put(ent, methodList);
+ }
+ methodList.add(mthd);
+ });
+ }
}
return clazz.newInstance();
diff --git
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
index f1a1385..b2f392b 100644
---
a/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
+++
b/extensions/jni/nifi-framework-jni/src/main/java/org/apache/nifi/processor/JniProcessSession.java
@@ -1,6 +1,5 @@
package org.apache.nifi.processor;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
@@ -15,7 +14,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.regex.Pattern;
-import java.util.stream.IntStream;
public class JniProcessSession implements ProcessSession {
@@ -41,7 +39,6 @@ public class JniProcessSession implements ProcessSession {
@Override
public void adjustCounter(String name, long delta, boolean immediate) {
-
}
@Override
@@ -173,20 +170,39 @@ public class JniProcessSession implements ProcessSession {
}
}
+ /**
+ * I don't like surrounding this with a Buffered Input Stream, but it
seems that certain features expect this
+ * Case in point, CSV Reader:
+ * createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger)
+ *
+ * In this method we've erased the concrete type and are assuming the
InputStream is a BufferedInputStream.
+ * While we can fix this, there is no guarantee that others don't abide by
this. As a result we'll use
+ * BufferedInputStream here until we can safely move away.
+ */
@Override
public void read(FlowFile source, InputStreamCallback reader) throws
FlowFileAccessException {
try {
- final JniInputStream input = readFlowFile(source);
+ final BufferedInputStream input = new BufferedInputStream(
readFlowFile(source) );
if (input != null)
reader.process(input);
} catch (IOException e) {
+ e.printStackTrace();
throw new FlowFileAccessException("Could not read from native
source", e);
}
}
@Override
public InputStream read(FlowFile flowFile) {
- return readFlowFile(flowFile);
+ /**
+ * I don't like surrounding this with a Buffered Input Stream, but it
seems that certain features expect this
+ * Case in point, CSV Reader:
+ * createRecordReader(final Map<String, String> variables, final
InputStream in, final ComponentLog logger)
+ *
+ * In this method we've erased the concrete type and are assuming the
InputStream is a BufferedInputStream.
+ * While we can fix this, there is no guarantee that others don't
abide by this. As a result we'll use
+ * BufferedInputStream here until we can safely move away.
+ */
+ return new BufferedInputStream(readFlowFile(flowFile));
}
@Override
@@ -230,7 +246,7 @@ public class JniProcessSession implements ProcessSession {
ByteArrayOutputStream bin = new ByteArrayOutputStream();
@Override
public void write(int b) throws IOException {
- synchronized (bin) {
+ synchronized (this) {
bin.write(b);
// better suited to writing pages of memory
if (bin.size() > 4096) {
@@ -241,7 +257,16 @@ public class JniProcessSession implements ProcessSession {
@Override
public void flush() throws IOException {
- synchronized (bin) {
+ synchronized (this) {
+ // flush as an append.
+ flushByterArray();
+ }
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ synchronized (this) {
// flush as an append.
flushByterArray();
}
@@ -251,6 +276,7 @@ public class JniProcessSession implements ProcessSession {
append(source,bin.toByteArray());
bin = new ByteArrayOutputStream();
}
+
};
}
@@ -283,12 +309,30 @@ public class JniProcessSession implements ProcessSession {
return source;
}
+ /**
+ * IOUtils was slow due to non-buffering. Underlying streams may buffer
but this alleviates
+ * pressure with a small footprint.
+ * @param in input stream
+ * @param out output stream
+ * @throws IOException
+ */
+ private static void copyData(InputStream in, OutputStream out) throws
IOException {
+ byte[] buffer = new byte[1 * 1024];
+ int len;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ }
+
+
@Override
public FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile
destination){
try {
- IOUtils.copy(Files.newInputStream(source),write(destination));
- if (!keepSourceFile){
- Files.delete(source);
+ try(OutputStream out = write(destination)) {
+ copyData(Files.newInputStream(source), out);
+ if (!keepSourceFile) {
+ Files.delete(source);
+ }
}
} catch (IOException e) {
return null;
@@ -299,7 +343,9 @@ public class JniProcessSession implements ProcessSession {
@Override
public FlowFile importFrom(InputStream source, FlowFile destination){
try {
- IOUtils.copy(source,write(destination));
+ try(OutputStream out = write(destination)) {
+ copyData(source, out);
+ }
} catch (IOException e) {
return null;
}
diff --git a/extensions/pythonprocessors/SentimentAnalysis.py
b/extensions/pythonprocessors/examples/SentimentAnalysis.py
similarity index 100%
rename from extensions/pythonprocessors/SentimentAnalysis.py
rename to extensions/pythonprocessors/examples/SentimentAnalysis.py
diff --git a/extensions/pythonprocessors/google/SentimentAnalyzer.py
b/extensions/pythonprocessors/google/SentimentAnalyzer.py
new file mode 100644
index 0000000..3d54af0
--- /dev/null
+++ b/extensions/pythonprocessors/google/SentimentAnalyzer.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+ Install the following with pip ( or pip3 )
+
+ pip install google-cloud-language
+
+ -- the following were needed during development as we saw SSL timeout errors
+ pip install requests[security]
+ pip install -U httplib2
+"""
+import json
+import sys
+import codecs
+from google.cloud import language
+from google.cloud.language import enums
+from google.cloud.language import types
+
+def describe(processor):
+ processor.setDescription("Performs a sentiment Analysis of incoming
flowfile content using Google Cloud.")
+
+def onInitialize(processor):
+ # is required,
+ processor.addProperty("Credentials Path","Path to your Google Credentials
JSON File. Must exist on agent hosts.","", True, False)
+
+class ContentExtract(object):
+ def __init__(self):
+ self.content = None
+
+ def process(self, input_stream):
+ self.content = codecs.getreader('utf-8')(input_stream).read()
+ return len(self.content)
+
+
+def onTrigger(context, session):
+ flow_file = session.get()
+ if flow_file is not None:
+ credentials_filename = context.getProperty("Credentials Path")
+ sentiment = ContentExtract()
+ session.read(flow_file,sentiment)
+ client =
language.LanguageServiceClient.from_service_account_json(credentials_filename)
+ document =
types.Document(content=sentiment.content,type=enums.Document.Type.PLAIN_TEXT)
+
+ annotations = client.analyze_sentiment(document=document, retry =
None,timeout=1.0 )
+ score = annotations.document_sentiment.score
+ magnitude = annotations.document_sentiment.magnitude
+
+ flow_file.addAttribute("score",str(score))
+ flow_file.addAttribute("magnitude",str(magnitude))
+ session.transfer(flow_file, REL_SUCCESS)
+
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 5842bf2..bdcc3e5 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -61,11 +61,12 @@ void DatabaseContentRepository::stop() {
}
}
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
// the traditional approach with these has been to return -1 from the
stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when
this stream is not valid.
if (nullptr == claim || !is_valid_ || !db_)
return nullptr;
+ // append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_,
true);
}
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 7e5705f..6d12460 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -86,7 +86,7 @@ class DatabaseContentRepository : public
core::ContentRepository, public core::C
virtual void stop();
- virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
virtual std::shared_ptr<io::BaseStream> read(const
std::shared_ptr<minifi::ResourceClaim> &claim);
diff --git a/extensions/script/python/ExecutePythonProcessor.cpp
b/extensions/script/python/ExecutePythonProcessor.cpp
index 249ff09..66d8bf6 100644
--- a/extensions/script/python/ExecutePythonProcessor.cpp
+++ b/extensions/script/python/ExecutePythonProcessor.cpp
@@ -55,13 +55,12 @@ void ExecutePythonProcessor::initialize() {
properties.insert(ScriptFile);
properties.insert(ModuleDirectory);
setSupportedProperties(properties);
- setSupportedProperties(std::move(properties));
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(std::move(relationships));
-
+ setAcceptAllProperties();
if (!prop.empty()) {
setProperty(ScriptFile, prop);
std::shared_ptr<script::ScriptEngine> engine;
diff --git a/extensions/script/python/PythonCreator.h
b/extensions/script/python/PythonCreator.h
index fb87592..0a320d0 100644
--- a/extensions/script/python/PythonCreator.h
+++ b/extensions/script/python/PythonCreator.h
@@ -93,11 +93,17 @@ class PythonCreator : public minifi::core::CoreComponent {
core::ClassLoader::getDefaultClassLoader().registerResource("",
"createPyProcFactory");
for (const auto &path : classpaths_) {
-
const auto &scriptName = getScriptName(path);
utils::Identifier uuid;
- auto processor =
std::dynamic_pointer_cast<core::Processor>(core::ClassLoader::getDefaultClassLoader().instantiate(scriptName,
uuid));
+
+ std::string loadName = scriptName;
+ const auto &package = getPackage(pathListings, path);
+
+ if (!package.empty())
+ loadName = "org.apache.nifi.minifi.processors." + package + "." +
scriptName;
+
+ auto processor =
std::dynamic_pointer_cast<core::Processor>(core::ClassLoader::getDefaultClassLoader().instantiate(loadName,
uuid));
if (processor) {
try {
processor->initialize();
@@ -112,6 +118,7 @@ class PythonCreator : public minifi::core::CoreComponent {
details.artifact = getFileName(path);
details.version = minifi::AgentBuild::VERSION;
details.group = "python";
+
minifi::ClassDescription description(script_with_package);
description.dynamic_properties_ =
proc->getPythonSupportDynamicProperties();
auto properties = proc->getPythonProperties();
diff --git a/libminifi/include/core/StreamManager.h
b/libminifi/include/core/StreamManager.h
index 5de9dbe..65e0414 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -48,7 +48,7 @@ class StreamManager {
* @param streamId stream identifier
* @return stream pointer.
*/
- virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T>
&streamId) = 0;
+ virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T>
&streamId, bool append = false) = 0;
/**
* Create a read stream using the streamId as a reference.
diff --git a/libminifi/include/core/repository/FileSystemRepository.h
b/libminifi/include/core/repository/FileSystemRepository.h
index 4a5ad5e..56a103c 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -49,7 +49,7 @@ class FileSystemRepository : public core::ContentRepository,
public core::CoreCo
bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
- virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
virtual std::shared_ptr<io::BaseStream> read(const
std::shared_ptr<minifi::ResourceClaim> &claim);
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h
b/libminifi/include/core/repository/VolatileContentRepository.h
index e203e96..d792329 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -77,7 +77,7 @@ class VolatileContentRepository : public
core::ContentRepository, public virtual
* @param claim resource claim
* @return BaseStream shared pointer that represents the stream the consumer
will write to.
*/
- virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append);
/**
* Creates readable stream.
diff --git a/libminifi/include/io/FileStream.h
b/libminifi/include/io/FileStream.h
index 94d13b2..863f069 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -50,8 +50,10 @@ class FileStream : public io::BaseStream {
/**
* File Stream constructor that accepts an fstream shared pointer.
* It must already be initialized for read and write.
+ * @param path path to file
+ * @param append identifies if this is an append or overwriting the file
*/
- explicit FileStream(const std::string &path);
+ explicit FileStream(const std::string &path, bool append = false);
virtual ~FileStream() {
closeStream();
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index dcc9a78..40ee900 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -298,7 +298,6 @@ void ProcessSession::write(const
std::shared_ptr<core::FlowFile> &flow, OutputSt
void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow,
OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = nullptr;
-
if (flow->getResourceClaim() == nullptr) {
// No existed claim for append, we need to create new claim
return write(flow, callback);
@@ -308,20 +307,23 @@ void ProcessSession::append(const
std::shared_ptr<core::FlowFile> &flow, OutputS
try {
uint64_t startTime = getTimeMillis();
- std::shared_ptr<io::BaseStream> stream =
process_context_->getContentRepository()->write(claim);
+ std::shared_ptr<io::BaseStream> stream =
process_context_->getContentRepository()->write(claim, true);
if (nullptr == stream) {
rollback();
return;
}
// Call the callback to write the content
+
size_t oldPos = stream->getSize();
- stream->seek(oldPos + 1);
+ // this prevents an issue if we write, above, with zero length.
+ if (oldPos > 0)
+ stream->seek(oldPos + 1);
if (callback->process(stream) < 0) {
rollback();
return;
}
uint64_t appendSize = stream->getSize() - oldPos;
- flow->setSize(flow->getSize() + appendSize);
+ flow->setSize(stream->getSize());
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify
flow record content " << flow->getUUIDStr();
@@ -342,6 +344,10 @@ void ProcessSession::read(const
std::shared_ptr<core::FlowFile> &flow, InputStre
if (flow->getResourceClaim() == nullptr) {
// No existed claim for read, we throw exception
+ logger_->log_debug("For %s, no resource claim but size is %d",
flow->getUUIDStr(), flow->getSize());
+ if (flow->getSize() == 0) {
+ return;
+ }
throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for
read");
}
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp
b/libminifi/src/core/repository/FileSystemRepository.cpp
index c0f1694..4607d74 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -42,8 +42,8 @@ bool FileSystemRepository::initialize(const
std::shared_ptr<minifi::Configure> &
void FileSystemRepository::stop() {
}
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim) {
- return std::make_shared<io::FileStream>(claim->getContentFullPath());
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
+ return std::make_shared<io::FileStream>(claim->getContentFullPath(), append);
}
bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim>
&streamId) {
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 674566b..9b5f9be 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -87,7 +87,7 @@ void VolatileContentRepository::start() {
logger_->log_info("%s Repository Monitor Thread Start", getName());
}
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const
std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
logger_->log_info("enter write for %s", claim->getContentFullPath());
{
std::lock_guard<std::mutex> lock(map_mutex_);
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 20cf7a9..6ee0e95 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -28,12 +28,15 @@ namespace nifi {
namespace minifi {
namespace io {
-FileStream::FileStream(const std::string &path)
+FileStream::FileStream(const std::string &path, bool append)
: logger_(logging::LoggerFactory<FileStream>::getLogger()),
path_(path),
offset_(0) {
file_stream_ = std::unique_ptr<std::fstream>(new std::fstream());
- file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
+ if (append)
+ file_stream_->open(path.c_str(), std::fstream::in | std::fstream::out |
std::fstream::app | std::fstream::binary);
+ else
+ file_stream_->open(path.c_str(), std::fstream::out | std::fstream::binary);
file_stream_->seekg(0, file_stream_->end);
file_stream_->seekp(0, file_stream_->end);
int len = file_stream_->tellg();