mattyb149 commented on a change in pull request #2861: NIFI-5248 Added new 
Elasticsearch json and record processors.
URL: https://github.com/apache/nifi/pull/2861#discussion_r345340948
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 ##########
 @@ -174,68 +169,151 @@ private Response runQuery(String endpoint, String 
query, String index, String ty
 
         HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
 
-        return client.performRequest("POST", sb.toString(), 
Collections.emptyMap(), queryEntity);
+        try {
+            return client.performRequest("POST", sb.toString(), 
Collections.emptyMap(), queryEntity);
+        } catch (Exception e) {
+            throw new ElasticsearchError(e);
+        }
     }
 
-    private Map<String, Object> parseResponse(Response response) throws 
IOException {
+    private Map<String, Object> parseResponse(Response response) {
         final int code = response.getStatusLine().getStatusCode();
 
-        if (code >= 200 & code < 300) {
-            InputStream inputStream = response.getEntity().getContent();
-            byte[] result = IOUtils.toByteArray(inputStream);
-            inputStream.close();
-            return mapper.readValue(new String(result, charset), Map.class);
-        } else {
-            String errorMessage = String.format("ElasticSearch reported an 
error while trying to run the query: %s",
-                response.getStatusLine().getReasonPhrase());
-            throw new IOException(errorMessage);
+        try {
+            if (code >= 200 & code < 300) {
+                InputStream inputStream = response.getEntity().getContent();
+                byte[] result = IOUtils.toByteArray(inputStream);
+                inputStream.close();
+                return mapper.readValue(new String(result, charset), 
Map.class);
+            } else {
+                String errorMessage = String.format("ElasticSearch reported an 
error while trying to run the query: %s",
+                        response.getStatusLine().getReasonPhrase());
+                throw new IOException(errorMessage);
+            }
+        } catch (Exception ex) {
+            throw new ElasticsearchError(ex);
         }
     }
 
     @Override
-    public IndexOperationResponse add(IndexOperationRequest operation) throws 
IOException {
-        return add(Arrays.asList(operation));
+    public IndexOperationResponse add(IndexOperationRequest operation) {
+        return bulk(Arrays.asList(operation));
     }
 
-    @Override
-    public IndexOperationResponse add(List<IndexOperationRequest> operations) 
throws IOException {
-        BulkRequest bulkRequest = new BulkRequest();
-        for (int index = 0; index < operations.size(); index++) {
-            IndexOperationRequest or = operations.get(index);
-            IndexRequest indexRequest = new IndexRequest(or.getIndex(), 
or.getType(), or.getId())
-                .source(or.getFields());
-            bulkRequest.add(indexRequest);
+    private String flatten(String str) {
+        return str.replaceAll("[\\n\\r]", "\\\\n");
+    }
+
+    private String buildBulkHeader(IndexOperationRequest request) throws 
JsonProcessingException {
+        String operation = 
request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
+                ? "update"
+                : request.getOperation().getValue();
+        return buildBulkHeader(operation, request.getIndex(), 
request.getType(), request.getId());
+    }
+
+    private String buildBulkHeader(String operation, String index, String 
type, String id) throws JsonProcessingException {
+        Map<String, Object> header = new HashMap<String, Object>() {{
+            put(operation, new HashMap<String, Object>() {{
+                put("_index", index);
+                put("_id", id);
+                put("_type", type);
+            }});
+        }};
+
+        return flatten(mapper.writeValueAsString(header));
+    }
+
+    protected void buildRequest(IndexOperationRequest request, StringBuilder 
builder) throws JsonProcessingException {
+        String header = buildBulkHeader(request);
+        builder.append(header).append("\n");
+        if 
(request.getOperation().equals(IndexOperationRequest.Operation.Index)) {
+            String indexDocument = 
mapper.writeValueAsString(request.getFields());
+            builder.append(indexDocument).append("\n");
+        } else if 
(request.getOperation().equals(IndexOperationRequest.Operation.Update)
+            || 
request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+            Map<String, Object> doc = new HashMap<String, Object>(){{
+                put("doc", request.getFields());
+                if 
(request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
+                    put("doc_as_upsert", true);
+                }
+            }};
+            String update = flatten(mapper.writeValueAsString(doc)).trim();
+            builder.append(String.format("%s\n", update));
         }
+    }
+
+    @Override
+    public IndexOperationResponse bulk(List<IndexOperationRequest> operations) 
{
+        try {
+            StringBuilder payload = new StringBuilder();
+            for (int index = 0; index < operations.size(); index++) {
+                IndexOperationRequest or = operations.get(index);
+                buildRequest(or, payload);
+            }
+
+            getLogger().info(payload.toString());
 
 Review comment:
   This should be at DEBUG or TRACE level

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to