This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new dc38a6b  NIFI-6288: hooking in & using charset to HTTP based ES 
processors
dc38a6b is described below

commit dc38a6bdceec0095ae322f225401b3f505af1da5
Author: Endre Zoltan Kovacs <[email protected]>
AuthorDate: Fri May 10 13:10:20 2019 +0200

    NIFI-6288: hooking in & using charset to HTTP based ES processors
    
    This closes #3467
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../elasticsearch/AbstractElasticsearchHttpProcessor.java      |  1 +
 .../nifi/processors/elasticsearch/FetchElasticsearchHttp.java  |  4 +++-
 .../nifi/processors/elasticsearch/PutElasticsearchHttp.java    |  1 -
 .../processors/elasticsearch/PutElasticsearchHttpRecord.java   |  1 -
 .../nifi/processors/elasticsearch/QueryElasticsearchHttp.java  |  8 +++++---
 .../nifi/processors/elasticsearch/ScrollElasticsearchHttp.java | 10 ++++++----
 6 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 8cb34a1..5787e59 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -149,6 +149,7 @@ public abstract class AbstractElasticsearchHttpProcessor 
extends AbstractElastic
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(ES_URL);
         properties.add(PROP_SSL_CONTEXT_SERVICE);
+        properties.add(CHARSET);
         properties.add(USERNAME);
         properties.add(PASSWORD);
         properties.add(CONNECT_TIMEOUT);
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index e782ba3..fcdc9e3 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -45,6 +45,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -202,6 +203,7 @@ public class FetchElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
         // Authentication
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
         final String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         final ComponentLog logger = getLogger();
 
@@ -234,7 +236,7 @@ public class FetchElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                     flowFile = session.putAttribute(flowFile, "es.type", 
retrievedType);
                     if (source != null) {
                         flowFile = session.write(flowFile, out -> {
-                            out.write(source.toString().getBytes());
+                            out.write(source.toString().getBytes(charset));
                         });
                     }
                     logger.debug("Elasticsearch document " + retrievedId + " 
fetched, routing to success");
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 59fc029..d572b3d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -155,7 +155,6 @@ public class PutElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
         descriptors.add(ID_ATTRIBUTE);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
-        descriptors.add(CHARSET);
         descriptors.add(BATCH_SIZE);
         descriptors.add(INDEX_OP);
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 87dc5c3..7f6140f 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -266,7 +266,6 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         descriptors.add(ID_RECORD_PATH);
         descriptors.add(INDEX);
         descriptors.add(TYPE);
-        descriptors.add(CHARSET);
         descriptors.add(INDEX_OP);
         descriptors.add(SUPPRESS_NULLS);
         descriptors.add(DATE_FORMAT);
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 67deb87..7cb1fb4 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -49,6 +49,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -316,6 +317,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
         // Authentication
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         final ComponentLog logger = getLogger();
 
@@ -344,7 +346,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
                 numResults = this.getPage(getResponse, queryUrl, context, 
session, flowFile,
-                        logger, startNanos, targetIsContent, numResults);
+                        logger, startNanos, targetIsContent, numResults, 
charset);
                 fromIndex += pageSize;
                 getResponse.close();
             }
@@ -381,7 +383,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
 
     private int getPage(final Response getResponse, final URL url, final 
ProcessContext context,
             final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger,
-            final long startNanos, boolean targetIsContent, int 
priorResultCount)
+            final long startNanos, boolean targetIsContent, int 
priorResultCount, Charset charset)
             throws IOException {
         List<FlowFile> page = new ArrayList<>();
         final int statusCode = getResponse.code();
@@ -426,7 +428,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                     documentFlowFile = session.putAttribute(documentFlowFile, 
"filename", retrievedId);
                     documentFlowFile = session.putAttribute(documentFlowFile, 
"mime.type", "application/json");
                     documentFlowFile = session.write(documentFlowFile, out -> {
-                        out.write(source.toString().getBytes());
+                        out.write(source.toString().getBytes(charset));
                     });
                 } else {
                     Map<String, String> attributes = new HashMap<>();
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 01e5ae1..528f88c 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -51,6 +51,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -247,6 +248,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
         // Authentication
         final String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
         final String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
 
         final ComponentLog logger = getLogger();
 
@@ -268,7 +270,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
 
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, scrollurl,
                         username, password, "POST", body);
-                this.getPage(getResponse, scrollurl, context, session, 
flowFile, logger, startNanos);
+                this.getPage(getResponse, scrollurl, context, session, 
flowFile, logger, startNanos, charset);
                 getResponse.close();
             } else {
                 logger.debug("Querying {}/{} from Elasticsearch: {}", new 
Object[] { index,
@@ -281,7 +283,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
 
                 final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
                         username, password, "GET", null);
-                this.getPage(getResponse, queryUrl, context, session, 
flowFile, logger, startNanos);
+                this.getPage(getResponse, queryUrl, context, session, 
flowFile, logger, startNanos, charset);
                 getResponse.close();
             }
 
@@ -302,7 +304,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
     }
 
     private void getPage(final Response getResponse, final URL url, final 
ProcessContext context,
-            final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger, final long startNanos)
+            final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger, final long startNanos, Charset charset)
             throws IOException {
         final int statusCode = getResponse.code();
 
@@ -342,7 +344,7 @@ public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor
             logger.debug("Elasticsearch retrieved " + responseJson.size() + " 
documents, routing to success");
 
             flowFile = session.write(flowFile, out -> {
-                out.write(builder.toString().getBytes());
+                out.write(builder.toString().getBytes(charset));
             });
             session.transfer(flowFile, REL_SUCCESS);
 

Reply via email to