This is an automated email from the ASF dual-hosted git repository.
edbe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1a443c7 NIFI-5937 use processor-configured encoding instead of the
system default
1a443c7 is described below
commit 1a443c73ecb0ed00233b66d282031aa5b37afb24
Author: Alex Savitsky <[email protected]>
AuthorDate: Tue Jan 8 10:15:45 2019 -0500
NIFI-5937 use processor-configured encoding instead of the system default
NIFI-5937 added tests to verify that accented characters are preserved
correctly
NIFI-5937 unfolding starred imports
NIFI-5937 unfolding starred imports (now with statics)
Signed-off-by: Ed <[email protected]>
This closes #3250
---
.../elasticsearch/PutElasticsearchHttpRecord.java | 5 +-
.../TestPutElasticsearchHttpRecord.java | 94 ++++++++++++++++------
2 files changed, 73 insertions(+), 26 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index ac36604..52de424 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -73,6 +73,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -198,6 +199,7 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
descriptors.add(TYPE);
+ descriptors.add(CHARSET);
descriptors.add(INDEX_OP);
descriptors.add(SUPPRESS_NULLS);
@@ -313,6 +315,7 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
final String id_path =
context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null :
recordPathCache.getCompiled(id_path);
final StringBuilder sb = new StringBuilder();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
int recordCount = 0;
try (final InputStream in = session.read(flowFile);
@@ -345,7 +348,7 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
writeRecord(record, record.getSchema(), generator);
generator.flush();
generator.close();
- json.append(out.toString());
+ json.append(out.toString(charset.name()));
buildBulkCommand(sb, index, docType, indexOp, id,
json.toString());
recordCount++;
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 862e177..2cc16c1 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -16,18 +16,15 @@
*/
package org.apache.nifi.processors.elasticsearch;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -42,16 +39,21 @@ import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
-import okhttp3.Call;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Protocol;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
-public class TestPutElasticsearchHttpRecord {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+public class TestPutElasticsearchHttpRecord {
private TestRunner runner;
@After
@@ -61,7 +63,25 @@ public class TestPutElasticsearchHttpRecord {
@Test
public void testPutElasticSearchOnTriggerIndex() throws IOException {
- runner = TestRunners.newTestRunner(new
PutElasticsearchHttpRecordTestProcessor(false)); // no failures
+ PutElasticsearchHttpRecordTestProcessor processor = new
PutElasticsearchHttpRecordTestProcessor(false);
+ processor.setRecordChecks(record -> {
+ assertEquals(1, record.get("id"));
+ assertEquals("reç1", record.get("name"));
+ assertEquals(101, record.get("code"));
+ }, record -> {
+ assertEquals(2, record.get("id"));
+ assertEquals("ræc2", record.get("name"));
+ assertEquals(102, record.get("code"));
+ }, record -> {
+ assertEquals(3, record.get("id"));
+ assertEquals("rèc3", record.get("name"));
+ assertEquals(103, record.get("code"));
+ }, record -> {
+ assertEquals(4, record.get("id"));
+ assertEquals("rëc4", record.get("name"));
+ assertEquals(104, record.get("code"));
+ });
+ runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
@@ -368,6 +388,7 @@ public class TestPutElasticsearchHttpRecord {
int statusCode = 200;
String statusMessage = "OK";
String expectedUrl = null;
+ Consumer<Map>[] recordChecks;
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
@@ -382,6 +403,11 @@ public class TestPutElasticsearchHttpRecord {
expectedUrl = url;
}
+ @SafeVarargs
+ final void setRecordChecks(Consumer<Map>... checks) {
+ recordChecks = checks;
+ }
+
@Override
protected void createElasticsearchClient(ProcessContext context)
throws ProcessException {
client = mock(OkHttpClient.class);
@@ -391,6 +417,24 @@ public class TestPutElasticsearchHttpRecord {
if (statusCode != -1) {
Request realRequest = (Request)
invocationOnMock.getArguments()[0];
assertTrue((expectedUrl == null) ||
(expectedUrl.equals(realRequest.url().toString())));
+ if (recordChecks != null) {
+ final ObjectMapper mapper = new ObjectMapper();
+ Buffer sink = new Buffer();
+ realRequest.body().writeTo(sink);
+ String line;
+ int recordIndex = 0;
+ boolean content = false;
+ while ((line = sink.readUtf8Line()) != null) {
+ if (content) {
+ content = false;
+ if (recordIndex < recordChecks.length) {
+
recordChecks[recordIndex++].accept(mapper.readValue(line, Map.class));
+ }
+ } else {
+ content = true;
+ }
+ }
+ }
StringBuilder sb = new StringBuilder("{\"took\": 1,
\"errors\": \"");
sb.append(responseHasFailures);
sb.append("\", \"items\": [");
@@ -521,9 +565,9 @@ public class TestPutElasticsearchHttpRecord {
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
- parser.addRecord(1, "rec1", 101);
- parser.addRecord(2, "rec2", 102);
- parser.addRecord(3, "rec3", 103);
- parser.addRecord(4, "rec4", 104);
+ parser.addRecord(1, "reç1", 101);
+ parser.addRecord(2, "ræc2", 102);
+ parser.addRecord(3, "rèc3", 103);
+ parser.addRecord(4, "rëc4", 104);
}
}