This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1126776615 NIFI-14307 Closed Response Stream after reading in 
ElasticSearchClientServiceImpl (#9758)
1126776615 is described below

commit 11267766157c7a48f153a7cc29fe79a21f288e59
Author: Vijay Gorla <[email protected]>
AuthorDate: Thu Mar 6 16:57:45 2025 +1100

    NIFI-14307 Closed Response Stream after reading in 
ElasticSearchClientServiceImpl (#9758)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../ElasticSearchClientServiceImpl.java            | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 1fea28a3fa..213f4c49e8 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.commons.io.IOUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
@@ -257,7 +256,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
                     .explanation("""
                             Incorrect/invalid %s.
                             The HTTP Hosts should be valid URIs including 
protocol, domain and port for each entry.
-                            For example "https://elasticsearch1:9200, 
https://elasticsearch2:9200";.
+                            For example "https://elasticsearch1:9200, 
https://elasticsearch2:9200";.
                             
""".formatted(ElasticSearchClientService.HTTP_HOSTS.getDisplayName()));
         } catch (final InitializationException ie) {
             clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
@@ -597,10 +596,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
         try {
             if (code >= 200 && code < 300) {
-                final InputStream inputStream = 
response.getEntity().getContent();
-                final byte[] result = IOUtils.toByteArray(inputStream);
-                inputStream.close();
-                return mapper.readValue(new String(result, responseCharset), 
Map.class);
+                final String body = 
this.readContentAsString(response.getEntity(), this.responseCharset);
+                return mapper.readValue(body, Map.class);
             } else {
                 final String errorMessage = String.format("ElasticSearch 
reported an error while trying to run the query: %s",
                         response.getStatusLine().getReasonPhrase());
@@ -710,7 +707,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             final Response response = performRequest("POST", "/_bulk", 
requestParameters, entity);
             watch.stop();
 
-            final String rawResponse = 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+            final String rawResponse = 
this.readContentAsUtf8String(response.getEntity());
             parseResponseWarningHeaders(response);
 
             if (getLogger().isDebugEnabled()) {
@@ -751,7 +748,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             watch.stop();
 
             if (getLogger().isDebugEnabled()) {
-                getLogger().debug("Response for bulk delete: {}", 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+                getLogger().debug("Response for bulk delete: {}", 
this.readContentAsUtf8String(response.getEntity()));
             }
 
             parseResponseWarningHeaders(response);
@@ -853,7 +850,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             endpoint.append("/").append(id);
 
             final Response response = performRequest("GET", 
endpoint.toString(), requestParameters, null);
-            final String body = 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+
+            final String body = 
this.readContentAsUtf8String(response.getEntity());
             parseResponseWarningHeaders(response);
 
             return (Map<String, Object>) mapper.readValue(body, 
Map.class).getOrDefault("_source", Collections.emptyMap());
@@ -909,7 +907,8 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             appendIndex(endpoint, index);
             endpoint.append("/_pit");
             final Response response = performRequest("POST", 
endpoint.toString(), params, null);
-            final String body = 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+
+            final String body = 
this.readContentAsUtf8String(response.getEntity());
             parseResponseWarningHeaders(response);
 
             if (getLogger().isDebugEnabled()) {
@@ -932,7 +931,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             watch.stop();
 
             if (getLogger().isDebugEnabled()) {
-                getLogger().debug("Response for deleting Point in Time: {}", 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+                getLogger().debug("Response for deleting Point in Time: {}", 
this.readContentAsUtf8String(response.getEntity()));
             }
 
             parseResponseWarningHeaders(response);
@@ -958,7 +957,7 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
             watch.stop();
 
             if (getLogger().isDebugEnabled()) {
-                getLogger().debug("Response for deleting Scroll: {}", 
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+                getLogger().debug("Response for deleting Scroll: {}", 
this.readContentAsUtf8String(response.getEntity()));
             }
 
             parseResponseWarningHeaders(response);
@@ -1070,4 +1069,14 @@ public class ElasticSearchClientServiceImpl extends 
AbstractControllerService im
 
         return client.performRequest(request);
     }
+
+    private String readContentAsUtf8String(HttpEntity entity) throws 
IOException {
+        return this.readContentAsString(entity, StandardCharsets.UTF_8);
+    }
+
+    private String readContentAsString(HttpEntity entity, Charset charset) 
throws IOException {
+        try (InputStream responseStream = entity.getContent()) {
+            return new String(responseStream.readAllBytes(), charset);
+        }
+    }
 }

Reply via email to