This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1b0e3865e9ba160bf3b68e33f1938bc6c03229bd Author: Matthew Burgess <[email protected]> AuthorDate: Fri Feb 21 16:20:38 2020 -0500 NIFI-7114: Fix file leaks in StandardCommsSession and S2S Reporting components Signed-off-by: Joe Witt <[email protected]> --- .../nifi/reporting/SiteToSiteBulletinReportingTask.java | 14 +++++++++++--- .../nifi/reporting/SiteToSiteMetricsReportingTask.java | 12 ++++++++++-- .../reporting/SiteToSiteProvenanceReportingTask.java | 14 +++++++++++--- .../nifi/reporting/SiteToSiteStatusReportingTask.java | 14 +++++++++++--- .../reporting/sink/SiteToSiteReportingRecordSink.java | 16 ++++++++++------ .../distributed/cache/client/StandardCommsSession.java | 2 ++ 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 3e07759..1e68687 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -137,11 +137,12 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting final JsonArray jsonArray = arrayBuilder.build(); // Send the JSON document for the current batch + Transaction transaction = null; try { // Lazily create SiteToSiteClient to provide a StateManager setup(context); - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { getLogger().info("All destination nodes are penalized; will attempt to send data later"); return; @@ -162,8 +163,15 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); getLogger().info("Successfully sent {} Bulletins to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[]{bulletins.size(), transferMillis, transactionId, bulletins.get(0).getId()}); - } catch (final IOException e) { - throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e); + } catch (final Exception e) { + if (transaction != null) { + transaction.error(); + } + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e); + } } lastSentBulletinId = currMaxId; diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index e781dfa..898bbbd 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -191,12 +191,13 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes); } + Transaction transaction = null; try { // Lazily create SiteToSiteClient to provide a StateManager setup(context); long start = System.nanoTime(); - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { getLogger().debug("All destination nodes are penalized; will attempt to send data later"); return; @@ -215,7 +216,14 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId}); } catch (final Exception e) { - throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e); + if (transaction != null) { + transaction.error(); + } + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e); + } } } else { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 0ff507c..d382ca3 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -304,11 +304,12 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final JsonArray jsonArray = arrayBuilder.build(); // Send the JSON document for the current batch + Transaction transaction = null; try { // Lazily create SiteToSiteClient to provide a StateManager setup(context); - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { // Throw an exception to avoid provenance event id will not proceed so that those can be consumed again. throw new ProcessException("All destination nodes are penalized; will attempt to send data later"); @@ -329,8 +330,15 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()}); - } catch (final IOException e) { - throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + } catch (final Exception e) { + if (transaction != null) { + transaction.error(); + } + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + } } }); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 31009f8..7464b43 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -161,12 +161,13 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa while(!jsonBatch.isEmpty()) { // Send the JSON document for the current batch + Transaction transaction = null; try { // Lazily create SiteToSiteClient to provide a StateManager setup(context); long start = System.nanoTime(); - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { getLogger().debug("All destination nodes are penalized; will attempt to send data later"); return; @@ -197,8 +198,15 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa fromIndex = toIndex; toIndex = Math.min(fromIndex + batchSize, jsonArray.size()); jsonBatch = jsonArray.subList(fromIndex, toIndex); - } catch (final IOException e) { - throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e); + } catch (final Exception e) { + if (transaction != null) { + transaction.error(); + } + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e); + } } } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java index 20c501e..b2ed107 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java @@ -135,10 +135,10 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp @Override public WriteResult sendData(final RecordSet recordSet, final Map<String,String> attributes, final boolean sendZeroResults) throws IOException { - + Transaction transaction = null; try { WriteResult writeResult = null; - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { getLogger().info("All destination nodes are penalized; will attempt to send data later"); } else { @@ -166,12 +166,16 @@ public class SiteToSiteReportingRecordSink extends AbstractControllerService imp } } return writeResult; - } catch(IOException ioe) { - throw ioe; } catch (Exception e) { - throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e); + if (transaction != null) { + transaction.error(); + } + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e); + } } - } @OnDisabled diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index d157161..56e3389 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -71,6 +71,8 @@ public class StandardCommsSession implements CommsSession { @Override public void close() throws IOException { socketChannel.close(); + bufferedIn.close(); + bufferedOut.close(); } @Override
