This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new dc38a6b NIFI-6288: hooking in & using charset to HTTP based ES
processors
dc38a6b is described below
commit dc38a6bdceec0095ae322f225401b3f505af1da5
Author: Endre Zoltan Kovacs <[email protected]>
AuthorDate: Fri May 10 13:10:20 2019 +0200
NIFI-6288: hooking in & using charset to HTTP based ES processors
This closes #3467
Signed-off-by: Mike Thomsen <[email protected]>
---
.../elasticsearch/AbstractElasticsearchHttpProcessor.java | 1 +
.../nifi/processors/elasticsearch/FetchElasticsearchHttp.java | 4 +++-
.../nifi/processors/elasticsearch/PutElasticsearchHttp.java | 1 -
.../processors/elasticsearch/PutElasticsearchHttpRecord.java | 1 -
.../nifi/processors/elasticsearch/QueryElasticsearchHttp.java | 8 +++++---
.../nifi/processors/elasticsearch/ScrollElasticsearchHttp.java | 10 ++++++----
6 files changed, 15 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 8cb34a1..5787e59 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -149,6 +149,7 @@ public abstract class AbstractElasticsearchHttpProcessor
extends AbstractElastic
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ES_URL);
properties.add(PROP_SSL_CONTEXT_SERVICE);
+ properties.add(CHARSET);
properties.add(USERNAME);
properties.add(PASSWORD);
properties.add(CONNECT_TIMEOUT);
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index e782ba3..fcdc9e3 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -45,6 +45,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -202,6 +203,7 @@ public class FetchElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
// Authentication
final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
@@ -234,7 +236,7 @@ public class FetchElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
flowFile = session.putAttribute(flowFile, "es.type",
retrievedType);
if (source != null) {
flowFile = session.write(flowFile, out -> {
- out.write(source.toString().getBytes());
+ out.write(source.toString().getBytes(charset));
});
}
logger.debug("Elasticsearch document " + retrievedId + "
fetched, routing to success");
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index 59fc029..d572b3d 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -155,7 +155,6 @@ public class PutElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
- descriptors.add(CHARSET);
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_OP);
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 87dc5c3..7f6140f 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -266,7 +266,6 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
descriptors.add(TYPE);
- descriptors.add(CHARSET);
descriptors.add(INDEX_OP);
descriptors.add(SUPPRESS_NULLS);
descriptors.add(DATE_FORMAT);
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 67deb87..7cb1fb4 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -49,6 +49,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -316,6 +317,7 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
// Authentication
final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
@@ -344,7 +346,7 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
final Response getResponse =
sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
numResults = this.getPage(getResponse, queryUrl, context,
session, flowFile,
- logger, startNanos, targetIsContent, numResults);
+ logger, startNanos, targetIsContent, numResults,
charset);
fromIndex += pageSize;
getResponse.close();
}
@@ -381,7 +383,7 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
private int getPage(final Response getResponse, final URL url, final
ProcessContext context,
final ProcessSession session, FlowFile flowFile, final
ComponentLog logger,
- final long startNanos, boolean targetIsContent, int
priorResultCount)
+ final long startNanos, boolean targetIsContent, int
priorResultCount, Charset charset)
throws IOException {
List<FlowFile> page = new ArrayList<>();
final int statusCode = getResponse.code();
@@ -426,7 +428,7 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
documentFlowFile = session.putAttribute(documentFlowFile,
"filename", retrievedId);
documentFlowFile = session.putAttribute(documentFlowFile,
"mime.type", "application/json");
documentFlowFile = session.write(documentFlowFile, out -> {
- out.write(source.toString().getBytes());
+ out.write(source.toString().getBytes(charset));
});
} else {
Map<String, String> attributes = new HashMap<>();
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 01e5ae1..528f88c 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -51,6 +51,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -247,6 +248,7 @@ public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor
// Authentication
final String username =
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password =
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
@@ -268,7 +270,7 @@ public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor
final Response getResponse =
sendRequestToElasticsearch(okHttpClient, scrollurl,
username, password, "POST", body);
- this.getPage(getResponse, scrollurl, context, session,
flowFile, logger, startNanos);
+ this.getPage(getResponse, scrollurl, context, session,
flowFile, logger, startNanos, charset);
getResponse.close();
} else {
logger.debug("Querying {}/{} from Elasticsearch: {}", new
Object[] { index,
@@ -281,7 +283,7 @@ public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor
final Response getResponse =
sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
- this.getPage(getResponse, queryUrl, context, session,
flowFile, logger, startNanos);
+ this.getPage(getResponse, queryUrl, context, session,
flowFile, logger, startNanos, charset);
getResponse.close();
}
@@ -302,7 +304,7 @@ public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor
}
private void getPage(final Response getResponse, final URL url, final
ProcessContext context,
- final ProcessSession session, FlowFile flowFile, final
ComponentLog logger, final long startNanos)
+ final ProcessSession session, FlowFile flowFile, final
ComponentLog logger, final long startNanos, Charset charset)
throws IOException {
final int statusCode = getResponse.code();
@@ -342,7 +344,7 @@ public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor
logger.debug("Elasticsearch retrieved " + responseJson.size() + "
documents, routing to success");
flowFile = session.write(flowFile, out -> {
- out.write(builder.toString().getBytes());
+ out.write(builder.toString().getBytes(charset));
});
session.transfer(flowFile, REL_SUCCESS);