gresockj commented on a change in pull request #4693:
URL: https://github.com/apache/nifi/pull/4693#discussion_r724054441



##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -182,8 +181,8 @@ private Response runQuery(final String endpoint, final 
String query, final Strin
 
         final HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
         try {
-            return client.performRequest("POST", sb.toString(), 
requestParameters != null ? requestParameters : Collections.emptyMap(), 
queryEntity);
-        } catch (final Exception e) {
+            return performRequest("POST", sb.toString(), requestParameters, 
queryEntity);

Review comment:
       Consider `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context) 
{
         type  = 
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
         mapper = new ObjectMapper();
 
-        List<PropertyDescriptor> dynamic = 
context.getProperties().entrySet().stream()
-            .filter( e -> e.getKey().isDynamic())
-            .map(e -> e.getKey())
+        List<PropertyDescriptor> dynamic = 
context.getProperties().keySet().stream()
+            .filter(PropertyDescriptor::isDynamic)
             .collect(Collectors.toList());
 
-        Map<String, RecordPath> _temp = new HashMap<>();
+        Map<String, RecordPath> temp = new HashMap<>();
         for (PropertyDescriptor desc : dynamic) {
             String value = context.getProperty(desc).getValue();

Review comment:
       We could add some `final` modifiers here while we're at it.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -176,7 +174,7 @@ public PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(String name) {
         }
     }
 
-    private void validateCoordinates(Map coordinates) throws 
LookupFailureException {
+    private void validateCoordinates(Map<String, Object> coordinates) throws 
LookupFailureException {
         List<String> reasons = new ArrayList<>();

Review comment:
       Might as well make this `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor 
implements ElasticsearchRestProcessor {
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+        .description("If the \"by query\" operation fails, and a flowfile was 
read, it will be sent to this relationship.")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+        .description("If the \"by query\" operation succeeds, and a flowfile 
was read, it will be sent to this relationship.")
+        .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private volatile ElasticSearchClientService clientService;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    abstract String tookAttribute();

Review comment:
       I think we should prefer a verb naming for these methods: 
`getTookAttribute` and `getErrorAttribute`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.delete.took", description 
= "The amount of time that it took to complete the delete operation in ms."),
+        @WritesAttribute(attribute = "elasticsearch.delete.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
delete.")
+})
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
 @CapabilityDescription("Delete from an Elasticsearch index using a query. The 
query can be loaded from a flowfile body " +
         "or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
-    @WritesAttribute(attribute = "elasticsearch.delete.took", description = 
"The amount of time that it took to complete the delete operation in ms."),
-    @WritesAttribute(attribute = "elasticsearch.delete.error", description = 
"The error message provided by Elasticsearch if there is an error running the 
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-        .description("If the delete by query fails, and a flowfile was read, 
it will be sent to this relationship.").build();
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-        .description("If the delete by query succeeds, and a flowfile was 
read, it will be sent to this relationship.")
-        .build();
-
-
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")

Review comment:
       I think we should mention that these parameters will override any 
matching parameters in the request body, which might be provided by other 
non-dynamic properties.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -254,14 +271,16 @@
         return DESCRIPTORS;
     }
 
-    private RecordReaderFactory readerFactory;
-    private RecordPathCache recordPathCache;
-    private ElasticSearchClientService clientService;
-    private RecordSetWriterFactory writerFactory;
-    private boolean logErrors;
-    private volatile String dateFormat;
-    private volatile String timeFormat;
-    private volatile String timestampFormat;
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {

Review comment:
       `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+import java.util.Map;
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.update.took", description 
= "The amount of time that it took to complete the update operation in ms."),
+        @WritesAttribute(attribute = "elasticsearch.update.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
update.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "update", "query"})
+@CapabilityDescription("Update documents in an Elasticsearch index using a 
query. The query can be loaded from a flowfile body " +
+        "or from the Query parameter.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,

Review comment:
       Same comments as above

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context) 
{
         type  = 
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
         mapper = new ObjectMapper();
 
-        List<PropertyDescriptor> dynamic = 
context.getProperties().entrySet().stream()
-            .filter( e -> e.getKey().isDynamic())
-            .map(e -> e.getKey())
+        List<PropertyDescriptor> dynamic = 
context.getProperties().keySet().stream()
+            .filter(PropertyDescriptor::isDynamic)
             .collect(Collectors.toList());
 
-        Map<String, RecordPath> _temp = new HashMap<>();
+        Map<String, RecordPath> temp = new HashMap<>();

Review comment:
       Let's rename this something more descriptive like 
`tempRecordPathMappings`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -104,26 +103,25 @@ public void onEnabled(final ConfigurationContext context) 
{
         type  = 
context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
         mapper = new ObjectMapper();
 
-        List<PropertyDescriptor> dynamic = 
context.getProperties().entrySet().stream()
-            .filter( e -> e.getKey().isDynamic())
-            .map(e -> e.getKey())
+        List<PropertyDescriptor> dynamic = 
context.getProperties().keySet().stream()

Review comment:
       Let's take the opportunity to rename this `dynamicDescriptors` or 
something similar.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -359,9 +358,21 @@ public DeleteOperationResponse deleteByQuery(final String 
query, final String in
         return new 
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
     }
 
+
+    public UpdateOperationResponse updateByQuery(final String query, final 
String index, final String type, final Map<String, String> requestParameters) {
+        long start = System.currentTimeMillis();
+        Response response = runQuery("_update_by_query", query, index, type, 
requestParameters);
+        long end   = System.currentTimeMillis();

Review comment:
       Though it aligns the `System.currentTimeMillis`, I think we should 
prefer removing the extra spaces here.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -83,15 +83,14 @@
     private String type;
     private ObjectMapper mapper;
 
-    private final List<PropertyDescriptor> DESCRIPTORS;
+    private final List<PropertyDescriptor> descriptors;
 
     public ElasticSearchLookupService() {
-        List<PropertyDescriptor> _desc = new ArrayList<>();
-        _desc.addAll(super.getSupportedPropertyDescriptors());
-        _desc.add(CLIENT_SERVICE);
-        _desc.add(INDEX);
-        _desc.add(TYPE);
-        DESCRIPTORS = Collections.unmodifiableList(_desc);
+        List<PropertyDescriptor> desc = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        desc.add(CLIENT_SERVICE);
+        desc.add(INDEX);
+        desc.add(TYPE);
+        descriptors = Collections.unmodifiableList(desc);
     }
 
     private volatile ConcurrentHashMap<String, RecordPath> mappings;

Review comment:
       Let's move this up with the other variables, and rename it something 
more descriptive like `recordPathMappings`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor 
implements ElasticsearchRestProcessor {
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+        .description("If the \"by query\" operation fails, and a flowfile was 
read, it will be sent to this relationship.")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+        .description("If the \"by query\" operation succeeds, and a flowfile 
was read, it will be sent to this relationship.")
+        .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private volatile ElasticSearchClientService clientService;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    abstract String tookAttribute();
+
+    abstract String errorAttribute();
+
+    abstract OperationResponse performOperation(final 
ElasticSearchClientService clientService, final String query,
+                                                final String index, final 
String type,
+                                                final Map<String, String> 
requestParameters);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {

Review comment:
       Could be `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
##########
@@ -290,22 +288,19 @@ private Record getByQuery(final Map<String, Object> 
query, Map<String, String> c
     }
 
     private Record applyMappings(Record record, Map<String, Object> source) {
-        Record _rec = new MapRecord(record.getSchema(), new HashMap<>());
+        Record rec = new MapRecord(record.getSchema(), new HashMap<>());
 
-        mappings.entrySet().forEach(entry -> {
+        mappings.forEach((key, path) -> {
             try {
-                Object o = JsonPath.read(source, entry.getKey());
-                RecordPath path = entry.getValue();
-                Optional<FieldValue> first = 
path.evaluate(_rec).getSelectedFields().findFirst();
-                if (first.isPresent()) {
-                    first.get().updateValue(o);
-                }
+                Object o = JsonPath.read(source, key);

Review comment:
       Could use `final` here, etc.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.delete.took", description 
= "The amount of time that it took to complete the delete operation in ms."),
+        @WritesAttribute(attribute = "elasticsearch.delete.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
delete.")
+})
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
 @CapabilityDescription("Delete from an Elasticsearch index using a query. The 
query can be loaded from a flowfile body " +
         "or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
-    @WritesAttribute(attribute = "elasticsearch.delete.took", description = 
"The amount of time that it took to complete the delete operation in ms."),
-    @WritesAttribute(attribute = "elasticsearch.delete.error", description = 
"The error message provided by Elasticsearch if there is an error running the 
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-        .description("If the delete by query fails, and a flowfile was read, 
it will be sent to this relationship.").build();
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-        .description("If the delete by query succeeds, and a flowfile was 
read, it will be sent to this relationship.")
-        .build();
-
-
+@DynamicProperty(
+        name = "A URL query parameter",

Review comment:
       Rephrasing suggestion:
   ```suggestion
           name = "The name of a URL query parameter to add",
   ```

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
##########
@@ -141,14 +141,21 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters 
paginatedJsonQueryPara
             if (!newQuery && 
PAGINATION_SCROLL.getValue().equals(paginationType)) {
                 response = clientService.get().scroll(queryJson);
             } else {
+                Map<String, String> requestParameters = 
getUrlQueryParameters(context, input);

Review comment:
       `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -359,9 +358,21 @@ public DeleteOperationResponse deleteByQuery(final String 
query, final String in
         return new 
DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
     }
 
+
+    public UpdateOperationResponse updateByQuery(final String query, final 
String index, final String type, final Map<String, String> requestParameters) {
+        long start = System.currentTimeMillis();

Review comment:
       This variables could be `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class AbstractByQueryElasticsearch extends AbstractProcessor 
implements ElasticsearchRestProcessor {
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+        .description("If the \"by query\" operation fails, and a flowfile was 
read, it will be sent to this relationship.")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+        .description("If the \"by query\" operation succeeds, and a flowfile 
was read, it will be sent to this relationship.")
+        .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private volatile ElasticSearchClientService clientService;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(QUERY);
+        descriptors.add(QUERY_ATTRIBUTE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(CLIENT_SERVICE);
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    abstract String tookAttribute();
+
+    abstract String errorAttribute();
+
+    abstract OperationResponse performOperation(final 
ElasticSearchClientService clientService, final String query,
+                                                final String index, final 
String type,
+                                                final Map<String, String> 
requestParameters);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = null;
+        if (context.hasIncomingConnection()) {
+            input = session.get();
+
+            if (input == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        try {
+            final String query = getQuery(input, context, session);
+            final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
+            final String type  = context.getProperty(TYPE).isSet()
+                    ? 
context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
+                    : null;
+            final String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).isSet()
+                    ? 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                    : null;
+
+            final OperationResponse or = performOperation(this.clientService, 
query, index, type, getUrlQueryParameters(context, input));
+
+            if (input == null) {
+                input = session.create();
+            }
+
+            final Map<String, String> attrs = new HashMap<>();
+            attrs.put(tookAttribute(), String.valueOf(or.getTook()));
+            if (!StringUtils.isBlank(queryAttr)) {
+                attrs.put(queryAttr, query);
+            }
+
+            input = session.putAllAttributes(input, attrs);
+
+            session.transfer(input, REL_SUCCESS);
+        } catch (Exception e) {

Review comment:
       `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
##########
@@ -17,129 +17,48 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.elasticsearch.DeleteOperationResponse;
 import org.apache.nifi.elasticsearch.ElasticSearchClientService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.elasticsearch.OperationResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
+@WritesAttributes({
+        @WritesAttribute(attribute = "elasticsearch.delete.took", description 
= "The amount of time that it took to complete the delete operation in ms."),
+        @WritesAttribute(attribute = "elasticsearch.delete.error", description 
= "The error message provided by Elasticsearch if there is an error running the 
delete.")
+})
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "elastic", "elasticsearch", "delete", "query"})
 @CapabilityDescription("Delete from an Elasticsearch index using a query. The 
query can be loaded from a flowfile body " +
         "or from the Query parameter.")
-@Tags({ "elastic", "elasticsearch", "delete", "query"})
-@WritesAttributes({
-    @WritesAttribute(attribute = "elasticsearch.delete.took", description = 
"The amount of time that it took to complete the delete operation in ms."),
-    @WritesAttribute(attribute = "elasticsearch.delete.error", description = 
"The error message provided by Elasticsearch if there is an error running the 
delete.")
-})
-public class DeleteByQueryElasticsearch extends AbstractProcessor implements 
ElasticsearchRestProcessor {
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-        .description("If the delete by query fails, and a flowfile was read, 
it will be sent to this relationship.").build();
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-        .description("If the delete by query succeeds, and a flowfile was 
read, it will be sent to this relationship.")
-        .build();
-
-
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",

Review comment:
       Rephrasing suggestion:
   ```suggestion
           value = "The value of the URL query parameter",
   ```

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
##########
@@ -46,11 +48,15 @@
         "Elasticsearch JSON DSL. It does not automatically paginate queries 
for the user. If an incoming relationship is added to this " +
         "processor, it will use the flowfile's content for the query. Care 
should be taken on the size of the query because the entire response " +
         "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")

Review comment:
       Same comments as above

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
##########
@@ -48,6 +50,11 @@
 @CapabilityDescription("A processor that allows the user to run a paginated 
query (with aggregations) written with the Elasticsearch JSON DSL. " +
         "It will use the flowfile's content for the query unless the QUERY 
attribute is populated. " +
         "Search After/Point in Time queries must include a valid \"sort\" 
field.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")

Review comment:
       Same comments as above

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.junit.Assert
+import org.junit.Test
+
+abstract class AbstractByQueryElasticsearchTest {

Review comment:
       This looks like it could be easily converted to a Java test.  I know 
opinions are split on Groovy vs Java tests, but I'm going to recommend 
converting this one.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -414,8 +434,8 @@ private void removeBadRecordFlowFiles(List<FlowFile> bad, 
ProcessSession session
         bad.clear();
     }
 
-    private FlowFile indexDocuments(BulkOperation bundle, ProcessSession 
session, FlowFile input) throws Exception {
-        IndexOperationResponse response = 
clientService.bulk(bundle.getOperationList());
+    private FlowFile indexDocuments(BulkOperation bundle, ProcessContext 
context, ProcessSession session, FlowFile input) throws Exception {
+        IndexOperationResponse response = 
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, 
input));

Review comment:
       `final`

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -72,6 +74,11 @@
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", 
"index", "record"})
 @CapabilityDescription("A record-aware Elasticsearch put processor that uses 
the official Elastic REST client libraries.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")

Review comment:
       Same comments as above

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
##########
@@ -65,6 +66,11 @@
         "Search After/Point in Time queries must include a valid \"sort\" 
field. The processor will retrieve multiple pages of results " +
         "until either no more results are available or the Pagination Keep 
Alive expiration is reached, after which the query will " +
         "restart with the first page of results being retrieved.")
+@DynamicProperty(
+        name = "A URL query parameter",
+        value = "The value to set it to",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing")

Review comment:
       Same comments as above

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
##########
@@ -182,8 +181,8 @@ private Response runQuery(final String endpoint, final 
String query, final Strin
 
         final HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
         try {
-            return client.performRequest("POST", sb.toString(), 
requestParameters != null ? requestParameters : Collections.emptyMap(), 
queryEntity);
-        } catch (final Exception e) {
+            return performRequest("POST", sb.toString(), requestParameters, 
queryEntity);

Review comment:
       Sorry, looks like I clicked the wrong line.  I meant the Exception, 
which used to be final, but no longer is with the PR

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.junit.Assert
+import org.junit.Test
+
+abstract class AbstractByQueryElasticsearchTest {

Review comment:
       Fair enough, sounds good.

##########
File path: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
##########
@@ -426,22 +429,21 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) {
         session.transfer(input, REL_SUCCESS);
     }
 
-    private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession 
session) {
-        for (FlowFile badFlowFile : bad) {
+    private void removeBadRecordFlowFiles(final List<FlowFile> bad, final 
ProcessSession session) {
+        for (final FlowFile badFlowFile : bad) {
             session.remove(badFlowFile);
         }
 
         bad.clear();
     }
 
-    private FlowFile indexDocuments(BulkOperation bundle, ProcessContext 
context, ProcessSession session, FlowFile input) throws Exception {
-        IndexOperationResponse response = 
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, 
input));
+    private FlowFile indexDocuments(final BulkOperation bundle, final 
ProcessContext context, final ProcessSession session, final FlowFile input) 
throws Exception {
+        final IndexOperationResponse response = 
clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, 
input));
         if (response.hasErrors()) {
             if(logErrors || getLogger().isDebugEnabled()) {
-                List<Map<String, Object>> errors = response.getItems();
-                ObjectMapper mapper = new ObjectMapper();
-                mapper.enable(SerializationFeature.INDENT_OUTPUT);
-                String output = String.format("An error was encountered while 
processing bulk operations. Server response below:%n%n%s", 
mapper.writeValueAsString(errors));
+                final List<Map<String, Object>> errors = response.getItems();
+                objectMapper.enable(SerializationFeature.INDENT_OUTPUT);

Review comment:
       What do you think about putting the `enable()` call in the constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to