This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a6e28e NIFI-6289: using charset for byte encoding in
ExecuteSparkInteractive
3a6e28e is described below
commit 3a6e28eaa4e4fea7080057d02a00b189d2a7d1bd
Author: Endre Zoltan Kovacs <[email protected]>
AuthorDate: Fri May 10 16:02:48 2019 +0200
NIFI-6289: using charset for byte encoding in ExecuteSparkInteractive
This closes #3468
Signed-off-by: Mike Thomsen <[email protected]>
---
.../apache/nifi/processors/livy/ExecuteSparkInteractive.java | 12 +++---------
1 file changed, 3 insertions(+), 9 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
index d8ca9e1..b9edcac 100644
---
a/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
+++
b/nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
@@ -184,13 +184,7 @@ public class ExecuteSparkInteractive extends
AbstractProcessor {
return;
}
final long statusCheckInterval =
context.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
- Charset charset;
- try {
- charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
- } catch (Exception e) {
- log.warn("Illegal character set name specified, defaulting to
UTF-8");
- charset = StandardCharsets.UTF_8;
- }
+ Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
String sessionId = livyController.get("sessionId");
String livyUrl = livyController.get("livyUrl");
@@ -217,13 +211,13 @@ public class ExecuteSparkInteractive extends
AbstractProcessor {
} else {
try {
final JSONObject output = result.getJSONObject("data");
- flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes()));
+ flowFile = session.write(flowFile, out ->
out.write(output.toString().getBytes(charset)));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
session.transfer(flowFile, REL_SUCCESS);
} catch (JSONException je) {
// The result doesn't contain the data, just send the
output object as the flow file content to failure (after penalizing)
log.error("Spark Session returned an error, sending the
output JSON object as the flow file content to failure (after penalizing)");
- flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes()));
+ flowFile = session.write(flowFile, out ->
out.write(result.toString().getBytes(charset)));
flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);