Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2287#discussion_r161785583
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
 ---
    @@ -212,18 +224,38 @@ public void process(final InputStream in) throws 
IOException {
                                 String json = IOUtils.toString(in, charset)
                                         .replace("\r\n", " ").replace('\n', ' 
').replace('\r', ' ');
     
    -                            if (indexOp.equalsIgnoreCase("index")) {
    -                                
bulk.add(esClient.get().prepareIndex(index, docType, id)
    -                                        
.setSource(json.getBytes(charset)));
    -                            } else if (indexOp.equalsIgnoreCase("upsert")) 
{
    -                                
bulk.add(esClient.get().prepareUpdate(index, docType, id)
    -                                        .setDoc(json.getBytes(charset))
    -                                        .setDocAsUpsert(true));
    -                            } else if (indexOp.equalsIgnoreCase("update")) 
{
    -                                
bulk.add(esClient.get().prepareUpdate(index, docType, id)
    -                                        .setDoc(json.getBytes(charset)));
    -                            } else {
    -                                throw new IOException("Index operation: " 
+ indexOp + " not supported.");
    +                            switch(indexOp.toLowerCase()) {
    +                                                           case "index": {
    +                                                                   if 
(version != null) {
    +                                                                           
bulk.add(esClient.get().prepareIndex(index, docType, id)
    +                                                                           
                .setVersion(version).setVersionType(VersionType.EXTERNAL)
    +                                                                           
                .setSource(json.getBytes(charset)));
    +                                                                   } else {
    +                                                                           
bulk.add(esClient.get().prepareIndex(index, docType, id)
    +                                                                           
                .setSource(json.getBytes(charset)));
    +                                                                   }
    +                                                                   break;
    +                                                           }
    +                                                           case "upsert": {
    +                                                                   if 
(version != null) {
    --- End diff --
    
    This is a good runtime check, we could also add code to a customValidate() 
method that would check if we can resolve the index operation and version (no 
EL present, e.g.) and mark the processor as invalid if the index operation is 
not "index" and the version is not empty. We still need the runtime check(s) 
since we may not know the operation/version combo until runtime, but it is 
always helpful to mark a processor as invalid as early as possible. This is not 
a requirement, just a suggestion.


---

Reply via email to