NIFI-1073 - completed more tests, rebased on recent master
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1e6c10fe Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1e6c10fe Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1e6c10fe Branch: refs/heads/NIFI-1073 Commit: 1e6c10fe6916b56f12e76c1c0ac89c9368deea52 Parents: 4e8e2d0 Author: Tony Kurc <[email protected]> Authored: Sun Nov 8 22:44:49 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Sun Nov 8 22:44:49 2015 -0500 ---------------------------------------------------------------------- .../stream/io/TestLeakyBucketThrottler.java | 48 ++++++------ .../cluster/manager/impl/WebClusterManager.java | 13 ++-- .../repository/VolatileContentRepository.java | 9 ++- .../org/apache/nifi/web/server/JettyServer.java | 75 ++++++++----------- .../hadoop/TestCreateHadoopSequenceFile.java | 79 +++++++++++--------- .../processors/standard/ConvertJSONToSQL.java | 30 ++++---- .../nifi/processors/standard/GetHTTP.java | 9 ++- .../processors/standard/ListFileTransfer.java | 9 ++- .../nifi/processors/standard/PostHTTP.java | 2 + .../processors/standard/TestTransformXml.java | 30 ++++---- .../pom.xml | 4 + .../DistributedSetCacheClientService.java | 29 +++---- 12 files changed, 175 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java index 52bd8de..e1d6ce0 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java @@ -41,15 +41,16 @@ public class TestLeakyBucketThrottler { final byte[] data = new byte[1024 * 1024 * 4]; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); - - final long start = System.currentTimeMillis(); - throttledOut.write(data); - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); + try (final OutputStream throttledOut = throttler.newThrottledOutputStream(baos) ){ + + final long start = System.currentTimeMillis(); + throttledOut.write(data); + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + } } @Test(timeout = 10000) @@ -58,23 +59,22 @@ public class TestLeakyBucketThrottler { final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); final byte[] data = new byte[1024 * 1024 * 4]; - final ByteArrayInputStream bais = new ByteArrayInputStream(data); - final InputStream throttledIn = throttler.newThrottledInputStream(bais); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try ( final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final InputStream throttledIn = throttler.newThrottledInputStream(bais); + final ByteArrayOutputStream baos = new ByteArrayOutputStream() ){ + + final byte[] buffer = new byte[4096]; + final long start = System.currentTimeMillis(); + int len; + while ((len = throttledIn.read(buffer)) > 0) { + baos.write(buffer, 0, len); + } - final byte[] buffer = new byte[4096]; - final long start = System.currentTimeMillis(); - int len; - while ((len = throttledIn.read(buffer)) > 0) { - baos.write(buffer, 0, len); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); } - throttler.close(); - final long millis = System.currentTimeMillis() - start; - // should take 4 sec give or take - assertTrue(millis > 3000); - assertTrue(millis < 6000); - baos.close(); } @Test(timeout = 10000) http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 01f11b1..a354647 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -3402,16 +3402,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C completionService.submit(new Runnable() { @Override public void run() { - final OutputStream drain = new OutputStream() { - @Override - public void write(final int b) { /* drain response */ } - }; try { - ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(drain); + try (final OutputStream drain = new OutputStream() { + @Override + public void write(final int b) { /* drain response */ } + }) { + ((StreamingOutput) nodeResponse.getResponse().getEntity()).write(drain); + } } catch (final IOException | WebApplicationException ex) { logger.info("Failed clearing out non-client response buffer due to: " + ex, ex); - } finally { - FileUtils.closeQuietly(drain); } } }, null); http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 0451812..29870a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -401,16 +401,17 @@ public class VolatileContentRepository implements ContentRepository { @Override public long exportTo(ContentClaim claim, OutputStream destination) throws IOException { - final InputStream in = read(claim); - return StreamUtils.copy(in, destination); + try (final InputStream in = read(claim) ){ + return StreamUtils.copy(in, destination); + } } @Override public long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException { final InputStream in = read(claim); try { - StreamUtils.skip(in, offset); - StreamUtils.copy(in, destination, length); + StreamUtils.skip(in, offset); + StreamUtils.copy(in, destination, length); } finally { FileUtils.closeQuietly(in); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index 73cf7c5..983ffd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -375,24 +375,25 @@ public class JettyServer implements NiFiServer { } // get an input stream for the nifi-processor configuration file - BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); - - // read in each configured type - String rawComponentType; - while ((rawComponentType = in.readLine()) != null) { - // extract the component type - final String componentType = extractComponentType(rawComponentType); - if (componentType != null) { - List<String> extensions = uiExtensions.get(uiExtensionType); - - // if there are currently no extensions for this type create it - if (extensions == null) { - extensions = new ArrayList<>(); - uiExtensions.put(uiExtensionType, extensions); - } + try (BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry)))) { + + // read in each configured type + String rawComponentType; + while ((rawComponentType = in.readLine()) != null) { + // extract the component type + final String componentType = extractComponentType(rawComponentType); + if (componentType != null) { + List<String> extensions = uiExtensions.get(uiExtensionType); + + // if there are currently no extensions for this type create it + if (extensions == null) { + extensions = new ArrayList<>(); + uiExtensions.put(uiExtensionType, extensions); + } - // add the specified type - extensions.add(componentType); + // add the specified type + extensions.add(componentType); + } } } } @@ -440,40 +441,30 @@ public class JettyServer implements NiFiServer { */ private List<String> getWarExtensions(final File war, final String path) { List<String> processorTypes = new ArrayList<>(); - JarFile jarFile = null; - BufferedReader in = null; - try { - // load the jar file and attempt to find the nifi-processor entry - jarFile = new JarFile(war); + + // load the jar file and attempt to find the nifi-processor entry + try (JarFile jarFile = new JarFile(war)) { JarEntry jarEntry = jarFile.getJarEntry(path); // ensure the nifi-processor entry was found if (jarEntry != null) { // get an input stream for the nifi-processor configuration file - in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))); - - // read in each configured type - String rawProcessorType; - while ((rawProcessorType = in.readLine()) != null) { - // extract the processor type - final String processorType = extractComponentType(rawProcessorType); - if (processorType != null) { - processorTypes.add(processorType); + try (final BufferedReader in = new BufferedReader(new InputStreamReader(jarFile.getInputStream(jarEntry))) ){ + + // read in each configured type + String rawProcessorType; + while ((rawProcessorType = in.readLine()) != null) { + // extract the processor type + final String processorType = extractComponentType(rawProcessorType); + if (processorType != null) { + processorTypes.add(processorType); + } } } } } catch (IOException ioe) { - logger.warn(String.format("Unable to inspect %s for a custom processor UI.", war)); - } finally { - // close the jar file - which closes all input streams obtained via getInputStream above - if (jarFile != null) { - FileUtils.closeQuietly(jarFile); - } - // close the BufferedReader, this may not be strictly necessary - if (in != null){ - FileUtils.closeQuietly(in); - } - } + logger.warn("Unable to inspect {} for a custom processor UI.", new Object[]{war, ioe}); + } return processorTypes; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 430b4fa..30d386a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -71,9 +71,11 @@ public class TestCreateHadoopSequenceFile { } @Test - public void testSimpleCase() throws FileNotFoundException { + public void testSimpleCase() throws IOException { for (File inFile : inFiles) { - controller.enqueue(new FileInputStream(inFile)); + try (FileInputStream fin = new FileInputStream(inFile) ) { + controller.enqueue(fin); + } } controller.run(3); @@ -88,7 +90,9 @@ public class TestCreateHadoopSequenceFile { @Test public void testSequenceFileSaysValueIsBytesWritable() throws UnsupportedEncodingException, IOException { for (File inFile : inFiles) { - controller.enqueue(new FileInputStream(inFile)); + try (FileInputStream fin = new FileInputStream(inFile) ){ + controller.enqueue(); + } } controller.run(3); @@ -118,35 +122,39 @@ public class TestCreateHadoopSequenceFile { } @Test - public void testMergedTarData() throws FileNotFoundException { + public void testMergedTarData() throws IOException { Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/tar"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545312236534130.tar"), attributes); - controller.run(); - List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try (final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545312236534130.tar")) { + controller.enqueue(fin, attributes); + controller.run(); + List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } } @Test public void testMergedZipData() throws IOException { Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545423550275052.zip"), attributes); - controller.run(); - List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try (FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545423550275052.zip")){ + controller.enqueue(fin, attributes); + controller.run(); + List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } // FileOutputStream fos = new FileOutputStream("zip-3-randoms.sf"); // fos.write(data); // fos.flush(); @@ -157,16 +165,19 @@ public class TestCreateHadoopSequenceFile { public void testMergedFlowfilePackagedData() throws IOException { Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/flowfile-v3"); - controller.enqueue(new FileInputStream("src/test/resources/testdata/13545479542069498.pkg"), attributes); - controller.run(); - List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); - assertEquals(1, successSeqFiles.size()); - final byte[] data = successSeqFiles.iterator().next().toByteArray(); - // Data should be greater than 1000000 because that's the size of 2 of our input files, - // and the file size should contain all of that plus headers, but the headers should only - // be a couple hundred bytes. - assertTrue(data.length > 1000000); - assertTrue(data.length < 1501000); + try ( final FileInputStream fin = new FileInputStream("src/test/resources/testdata/13545479542069498.pkg")) { + controller.enqueue(fin, attributes); + + controller.run(); + List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + assertEquals(1, successSeqFiles.size()); + final byte[] data = successSeqFiles.iterator().next().toByteArray(); + // Data should be greater than 1000000 because that's the size of 2 of our input files, + // and the file size should contain all of that plus headers, but the headers should only + // be a couple hundred bytes. + assertTrue(data.length > 1000000); + assertTrue(data.length < 1501000); + } // FileOutputStream fos = new FileOutputStream("flowfilePkg-3-randoms.sf"); // fos.write(data); // fos.flush(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 9591960..6f14800 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -561,25 +561,27 @@ public class ConvertJSONToSQL extends AbstractProcessor { public static TableSchema from(final Connection conn, final String catalog, final String tableName, final boolean translateColumnNames, final boolean includePrimaryKeys) throws SQLException { - final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%"); + try (final ResultSet colrs = conn.getMetaData().getColumns(catalog, null, tableName, "%")) { - final List<ColumnDescription> cols = new ArrayList<>(); - while (colrs.next()) { - final ColumnDescription col = ColumnDescription.from(colrs); - cols.add(col); - } + final List<ColumnDescription> cols = new ArrayList<>(); + while (colrs.next()) { + final ColumnDescription col = ColumnDescription.from(colrs); + cols.add(col); + } - final Set<String> primaryKeyColumns = new HashSet<>(); - if (includePrimaryKeys) { - final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName); + final Set<String> primaryKeyColumns = new HashSet<>(); + if (includePrimaryKeys) { + try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) { - while (pkrs.next()) { - final String colName = pkrs.getString("COLUMN_NAME"); - primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + while (pkrs.next()) { + final String colName = pkrs.getString("COLUMN_NAME"); + primaryKeyColumns.add(normalizeColumnName(colName, translateColumnNames)); + } + } } - } - return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + return new TableSchema(cols, translateColumnNames, primaryKeyColumns); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 48ca2de..06d7182 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -49,13 +49,13 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; +import org.apache.commons.io.IOUtils; import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.config.Registry; @@ -67,6 +67,7 @@ import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLContexts; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -427,7 +428,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } // create the http client - final HttpClient client = clientBuilder.build(); + final CloseableHttpClient client = clientBuilder.build(); // create request final HttpGet get = new HttpGet(url); @@ -527,9 +528,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { session.rollback(); logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t); throw t; + } finally { + IOUtils.closeQuietly(client); } + } finally { + conMan.shutdown(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index ce344ed..994ccf0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -93,8 +93,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> { @Override protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { final FileTransfer transfer = getFileTransfer(context); - final List<FileInfo> listing = transfer.getListing(); - FileUtils.closeQuietly(transfer); + final List<FileInfo> listing; + try { + listing = transfer.getListing(); + } finally { + FileUtils.closeQuietly(transfer); + } + if (minTimestamp == null) { return listing; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index ef84629..9a7f0f4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -49,6 +49,7 @@ import javax.net.ssl.SSLSession; import javax.security.cert.X509Certificate; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.compress.utils.IOUtils; import org.apache.http.Header; import org.apache.http.HttpException; import org.apache.http.HttpHost; @@ -637,6 +638,7 @@ public class PostHTTP extends AbstractProcessor { + "configured to deliver FlowFiles; rolling back session", new Object[]{url}); session.rollback(); context.yield(); + IOUtils.closeQuietly(client); return; } } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java index 7074ec9..7239f01 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformXml.java @@ -93,23 +93,25 @@ public class TestTransformXml { StringBuilder builder = new StringBuilder(); builder.append("<data>\n"); - InputStream in = new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv")); - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + try(BufferedReader reader = new BufferedReader(new InputStreamReader( + new FileInputStream(new File("src/test/resources/TestTransformXml/tokens.csv"))))){ - String line = null; - while ((line = reader.readLine()) != null) { - builder.append(line).append("\n"); - } - builder.append("</data>"); - String data = builder.toString(); - runner.enqueue(data.getBytes(), attributes); - runner.run(); - runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS); - final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0); - final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1); + String line = null; + while ((line = reader.readLine()) != null) { + builder.append(line).append("\n"); + } + builder.append("</data>"); + String data = builder.toString(); + runner.enqueue(data.getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(TransformXml.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformXml.REL_SUCCESS).get(0); + final String transformedContent = new String(transformed.toByteArray(), StandardCharsets.ISO_8859_1); - transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml")); + transformed.assertContentEquals(Paths.get("src/test/resources/TestTransformXml/tokens.xml")); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml index e05f4dc..e335b1e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/pom.xml @@ -47,5 +47,9 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/1e6c10fe/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index 8c95c77..fd855e4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -127,10 +128,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService try { ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); } catch (final HandshakeException e) { - try { - session.close(); - } catch (final IOException ioe) { - } + IOUtils.closeQuietly(session); throw new IOException(e); } @@ -162,9 +160,9 @@ public class DistributedSetCacheClientService extends AbstractControllerService try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { dos.writeUTF("close"); dos.flush(); - commsSession.close(); } catch (final IOException e) { } + IOUtils.closeQuietly(commsSession); } if (logger.isDebugEnabled() && getIdentifier() != null) { logger.debug("Closed {}", new Object[]{getIdentifier()}); @@ -185,6 +183,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService } final CommsSession session = leaseCommsSession(); + boolean sessionError = false; try { final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); dos.writeUTF(methodName); @@ -198,22 +197,14 @@ public class DistributedSetCacheClientService extends AbstractControllerService final DataInputStream dis = new DataInputStream(session.getInputStream()); return dis.readBoolean(); } catch (final IOException ioe) { - try { - session.close(); - } catch (final IOException ignored) { - } - + sessionError = true; throw ioe; } finally { - if (!session.isClosed()) { - if (this.closed) { - try { - session.close(); - } catch (final IOException ioe) { - } - } else { - queue.offer(session); - } + if (sessionError == false && this.closed == false) { + queue.offer(session); + } + else{ + IOUtils.closeQuietly(session); } } }
