This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1126776615 NIFI-14307 Closed Response Stream after reading in
ElasticSearchClientServiceImpl (#9758)
1126776615 is described below
commit 11267766157c7a48f153a7cc29fe79a21f288e59
Author: Vijay Gorla <[email protected]>
AuthorDate: Thu Mar 6 16:57:45 2025 +1100
NIFI-14307 Closed Response Stream after reading in
ElasticSearchClientServiceImpl (#9758)
Signed-off-by: David Handermann <[email protected]>
---
.../ElasticSearchClientServiceImpl.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 1fea28a3fa..213f4c49e8 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
@@ -257,7 +256,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
.explanation("""
Incorrect/invalid %s.
The HTTP Hosts should be valid URIs including
protocol, domain and port for each entry.
- For example "https://elasticsearch1:9200,
https://elasticsearch2:9200".
+ For example "https://elasticsearch1:9200,
https://elasticsearch2:9200".
""".formatted(ElasticSearchClientService.HTTP_HOSTS.getDisplayName()));
} catch (final InitializationException ie) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
@@ -597,10 +596,8 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
try {
if (code >= 200 && code < 300) {
- final InputStream inputStream =
response.getEntity().getContent();
- final byte[] result = IOUtils.toByteArray(inputStream);
- inputStream.close();
- return mapper.readValue(new String(result, responseCharset),
Map.class);
+ final String body =
this.readContentAsString(response.getEntity(), this.responseCharset);
+ return mapper.readValue(body, Map.class);
} else {
final String errorMessage = String.format("ElasticSearch
reported an error while trying to run the query: %s",
response.getStatusLine().getReasonPhrase());
@@ -710,7 +707,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
final Response response = performRequest("POST", "/_bulk",
requestParameters, entity);
watch.stop();
- final String rawResponse =
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+ final String rawResponse =
this.readContentAsUtf8String(response.getEntity());
parseResponseWarningHeaders(response);
if (getLogger().isDebugEnabled()) {
@@ -751,7 +748,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
watch.stop();
if (getLogger().isDebugEnabled()) {
- getLogger().debug("Response for bulk delete: {}",
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+ getLogger().debug("Response for bulk delete: {}",
this.readContentAsUtf8String(response.getEntity()));
}
parseResponseWarningHeaders(response);
@@ -853,7 +850,8 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
endpoint.append("/").append(id);
final Response response = performRequest("GET",
endpoint.toString(), requestParameters, null);
- final String body =
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+
+ final String body =
this.readContentAsUtf8String(response.getEntity());
parseResponseWarningHeaders(response);
return (Map<String, Object>) mapper.readValue(body,
Map.class).getOrDefault("_source", Collections.emptyMap());
@@ -909,7 +907,8 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
appendIndex(endpoint, index);
endpoint.append("/_pit");
final Response response = performRequest("POST",
endpoint.toString(), params, null);
- final String body =
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+
+ final String body =
this.readContentAsUtf8String(response.getEntity());
parseResponseWarningHeaders(response);
if (getLogger().isDebugEnabled()) {
@@ -932,7 +931,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
watch.stop();
if (getLogger().isDebugEnabled()) {
- getLogger().debug("Response for deleting Point in Time: {}",
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+ getLogger().debug("Response for deleting Point in Time: {}",
this.readContentAsUtf8String(response.getEntity()));
}
parseResponseWarningHeaders(response);
@@ -958,7 +957,7 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
watch.stop();
if (getLogger().isDebugEnabled()) {
- getLogger().debug("Response for deleting Scroll: {}",
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8));
+ getLogger().debug("Response for deleting Scroll: {}",
this.readContentAsUtf8String(response.getEntity()));
}
parseResponseWarningHeaders(response);
@@ -1070,4 +1069,14 @@ public class ElasticSearchClientServiceImpl extends
AbstractControllerService im
return client.performRequest(request);
}
+
+ private String readContentAsUtf8String(HttpEntity entity) throws
IOException {
+ return this.readContentAsString(entity, StandardCharsets.UTF_8);
+ }
+
+ private String readContentAsString(HttpEntity entity, Charset charset)
throws IOException {
+ try (InputStream responseStream = entity.getContent()) {
+ return new String(responseStream.readAllBytes(), charset);
+ }
+ }
}