Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2287#discussion_r161785583
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
---
@@ -212,18 +224,38 @@ public void process(final InputStream in) throws
IOException {
String json = IOUtils.toString(in, charset)
.replace("\r\n", " ").replace('\n', '
').replace('\r', ' ');
- if (indexOp.equalsIgnoreCase("index")) {
-
bulk.add(esClient.get().prepareIndex(index, docType, id)
-
.setSource(json.getBytes(charset)));
- } else if (indexOp.equalsIgnoreCase("upsert"))
{
-
bulk.add(esClient.get().prepareUpdate(index, docType, id)
- .setDoc(json.getBytes(charset))
- .setDocAsUpsert(true));
- } else if (indexOp.equalsIgnoreCase("update"))
{
-
bulk.add(esClient.get().prepareUpdate(index, docType, id)
- .setDoc(json.getBytes(charset)));
- } else {
- throw new IOException("Index operation: "
+ indexOp + " not supported.");
+ switch(indexOp.toLowerCase()) {
+ case "index": {
+ if
(version != null) {
+
bulk.add(esClient.get().prepareIndex(index, docType, id)
+
.setVersion(version).setVersionType(VersionType.EXTERNAL)
+
.setSource(json.getBytes(charset)));
+ } else {
+
bulk.add(esClient.get().prepareIndex(index, docType, id)
+
.setSource(json.getBytes(charset)));
+ }
+ break;
+ }
+ case "upsert": {
+ if
(version != null) {
--- End diff --
This is a good runtime check, we could also add code to a customValidate()
method that would check if we can resolve the index operation and version (no
EL present, e.g.) and mark the processor as invalid if the index operation is
not "index" and the version is not empty. We still need the runtime check(s)
since we may not know the operation/version combo until runtime, but it is
always helpful to mark a processor as invalid as early as possible. This is not
a requirement, just a suggestion.
---