Repository: nifi Updated Branches: refs/heads/master 02071103d -> a91984446
NIFI-2567: Site-to-Site to send large data via HTTPS - It couldn't send data larger than about 7KB due to the mis-use of httpasyncclient library - Updated httpasyncclient from 4.1.1 to 4.1.2 - Let httpasyncclient framework to call produceContent multiple times as it gets ready to send more data via SSL session - Added HTTPS test cases to TestHttpClient, which failed without this fix Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9198444 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9198444 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9198444 Branch: refs/heads/master Commit: a919844461d63a26fa6c1d8c7daa447cd5ef912e Parents: 0207110 Author: Koji Kawamura <[email protected]> Authored: Sun Aug 14 22:01:52 2016 +0900 Committer: Mark Payne <[email protected]> Committed: Fri Aug 19 14:24:53 2016 -0400 ---------------------------------------------------------------------- nifi-commons/nifi-site-to-site-client/pom.xml | 2 +- .../remote/util/SiteToSiteRestApiClient.java | 65 ++++-- .../nifi/remote/client/http/TestHttpClient.java | 218 +++++++++++++++---- .../src/test/resources/certs/localhost-ks.jks | Bin 0 -> 3512 bytes .../src/test/resources/certs/localhost-ts.jks | Bin 0 -> 1816 bytes 5 files changed, 219 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml index 63e5c12..c1857b5 100644 --- a/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi-commons/nifi-site-to-site-client/pom.xml @@ -55,7 +55,7 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> - <version>4.1.1</version> + <version>4.1.2</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 8a379b7..d228378 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -112,6 +112,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -463,6 +464,8 @@ public class SiteToSiteRestApiClient implements Closeable { final HttpAsyncRequestProducer asyncRequestProducer = new HttpAsyncRequestProducer() { private final ByteBuffer buffer = ByteBuffer.allocate(DATA_PACKET_CHANNEL_READ_BUFFER_SIZE); + private int totalRead = 0; + private int totalProduced = 0; @Override public HttpHost getTarget() { @@ -485,45 +488,61 @@ public class SiteToSiteRestApiClient implements Closeable { return post; } + private final AtomicBoolean bufferHasRemainingData = new AtomicBoolean(false); + @Override public void produceContent(final ContentEncoder encoder, final IOControl ioControl) throws IOException { - int totalRead = 0; - int totalProduced = 0; + if (bufferHasRemainingData.get()) { + // If there's remaining buffer last time, send it first. + writeBuffer(encoder); + if (bufferHasRemainingData.get()) { + return; + } + } + int read; // This read() blocks until data becomes available, // or corresponding outputStream is closed. - while ((read = dataPacketChannel.read(buffer)) > -1) { + if ((read = dataPacketChannel.read(buffer)) > -1) { - buffer.flip(); - while (buffer.hasRemaining()) { - totalProduced += encoder.write(buffer); - } - buffer.clear(); logger.trace("Read {} bytes from dataPacketChannel. {}", read, flowFilesPath); totalRead += read; - } + buffer.flip(); + writeBuffer(encoder); - // There might be remaining bytes in buffer. Make sure it's fully drained. - buffer.flip(); - while (buffer.hasRemaining()) { - totalProduced += encoder.write(buffer); - } + } else { - final long totalWritten = commSession.getOutput().getBytesWritten(); - logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.", - flowFilesPath, totalProduced, totalRead, totalWritten); - if (totalRead != totalWritten || totalProduced != totalWritten) { - final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : $d : %d) were not equal. Something went wrong."; - throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten)); + final long totalWritten = commSession.getOutput().getBytesWritten(); + logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.", + flowFilesPath, totalProduced, totalRead, totalWritten); + if (totalRead != totalWritten || totalProduced != totalWritten) { + final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. Something went wrong."; + throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten)); + } + transferDataLatch.countDown(); + encoder.complete(); + dataPacketChannel.close(); } - transferDataLatch.countDown(); - encoder.complete(); - dataPacketChannel.close(); } + private void writeBuffer(ContentEncoder encoder) throws IOException { + while (buffer.hasRemaining()) { + final int written = encoder.write(buffer); + logger.trace("written {} bytes to encoder.", written); + if (written == 0) { + logger.trace("Buffer still has remaining. {}", buffer); + bufferHasRemainingData.set(true); + return; + } + totalProduced += written; + } + bufferHasRemainingData.set(false); + buffer.clear(); + } + @Override public void requestCompleted(final HttpContext context) { logger.debug("Sending data to {} completed.", flowFilesPath); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java index 3f6dd89..3a1b1bd 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.KeystoreType; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.codec.StandardFlowFileCodec; import org.apache.nifi.remote.io.CompressionInputStream; @@ -39,9 +40,16 @@ import org.apache.nifi.web.api.entity.ControllerEntity; import org.apache.nifi.web.api.entity.PeersEntity; import org.apache.nifi.web.api.entity.TransactionResultEntity; import org.codehaus.jackson.map.ObjectMapper; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -83,11 +91,14 @@ public class TestHttpClient { private static Logger logger = LoggerFactory.getLogger(TestHttpClient.class); private static Server server; + private static ServerConnector httpConnector; + private static ServerConnector sslConnector; final private static AtomicBoolean isTestCaseFinished = new AtomicBoolean(false); private static Set<PortDTO> inputPorts; private static Set<PortDTO> outputPorts; private static Set<PeerDTO> peers; + private static Set<PeerDTO> peersSecure; private static String serverChecksum; public static class SiteInfoServlet extends HttpServlet { @@ -96,11 +107,18 @@ public class TestHttpClient { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { final ControllerDTO controller = new ControllerDTO(); - controller.setRemoteSiteHttpListeningPort(server.getURI().getPort()); + + if (req.getLocalPort() == httpConnector.getLocalPort()) { + controller.setRemoteSiteHttpListeningPort(httpConnector.getLocalPort()); + controller.setSiteToSiteSecure(false); + } else { + controller.setRemoteSiteHttpListeningPort(sslConnector.getLocalPort()); + controller.setSiteToSiteSecure(true); + } + controller.setId("remote-controller-id"); controller.setInstanceId("remote-instance-id"); controller.setName("Remote NiFi Flow"); - controller.setSiteToSiteSecure(false); assertNotNull("Test case should set <inputPorts> depending on the test scenario.", inputPorts); controller.setInputPorts(inputPorts); @@ -124,8 +142,13 @@ public class TestHttpClient { final PeersEntity peersEntity = new PeersEntity(); - assertNotNull("Test case should set <peers> depending on the test scenario.", peers); - peersEntity.setPeers(peers); + if (req.getLocalPort() == httpConnector.getLocalPort()) { + assertNotNull("Test case should set <peers> depending on the test scenario.", peers); + peersEntity.setPeers(peers); + } else { + assertNotNull("Test case should set <peersSecure> depending on the test scenario.", peersSecure); + peersEntity.setPeers(peersSecure); + } respondWithJson(resp, peersEntity); } @@ -383,6 +406,21 @@ public class TestHttpClient { ServletHandler servletHandler = new ServletHandler(); contextHandler.insertHandler(servletHandler); + SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks"); + sslContextFactory.setKeyStorePassword("localtest"); + sslContextFactory.setKeyStoreType("JKS"); + + httpConnector = new ServerConnector(server); + + HttpConfiguration https = new HttpConfiguration(); + https.addCustomizer(new SecureRequestCustomizer()); + sslConnector = new ServerConnector(server, + new SslConnectionFactory(sslContextFactory, "http/1.1"), + new HttpConnectionFactory(https)); + + server.setConnectors(new Connector[] { httpConnector, sslConnector }); + servletHandler.addServletWithMapping(SiteInfoServlet.class, "/site-to-site"); servletHandler.addServletWithMapping(PeersServlet.class, "/site-to-site/peers"); @@ -412,8 +450,7 @@ public class TestHttpClient { server.start(); - int serverPort = server.getURI().getPort(); - logger.info("Starting server on port {}", serverPort); + logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort()); } @AfterClass @@ -450,17 +487,26 @@ public class TestHttpClient { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "DEBUG"); final URI uri = server.getURI(); + isTestCaseFinished.set(false); + final PeerDTO peer = new PeerDTO(); - peer.setHostname(uri.getHost()); - peer.setPort(uri.getPort()); + peer.setHostname("localhost"); + peer.setPort(httpConnector.getLocalPort()); peer.setFlowFileCount(10); peer.setSecure(false); - isTestCaseFinished.set(false); - peers = new HashSet<>(); peers.add(peer); + final PeerDTO peerSecure = new PeerDTO(); + peerSecure.setHostname("localhost"); + peerSecure.setPort(sslConnector.getLocalPort()); + peerSecure.setFlowFileCount(10); + peerSecure.setSecure(true); + + peersSecure = new HashSet<>(); + peersSecure.add(peerSecure); + inputPorts = new HashSet<>(); final PortDTO runningInputPort = new PortDTO(); @@ -522,9 +568,20 @@ public class TestHttpClient { } private SiteToSiteClient.Builder getDefaultBuilder() { - final URI uri = server.getURI(); return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP) - .url("http://" + uri.getHost() + ":" + uri.getPort() + "/nifi") + .url("http://localhost:" + httpConnector.getLocalPort() + "/nifi") + ; + } + + private SiteToSiteClient.Builder getDefaultBuilderHTTPS() { + return new SiteToSiteClient.Builder().transportProtocol(SiteToSiteTransportProtocol.HTTP) + .url("https://localhost:" + sslConnector.getLocalPort() + "/nifi") + .keystoreFilename("src/test/resources/certs/localhost-ks.jks") + .keystorePass("localtest") + .keystoreType(KeystoreType.JKS) + .truststoreFilename("src/test/resources/certs/localhost-ts.jks") + .truststorePass("localtest") + .truststoreType(KeystoreType.JKS) ; } @@ -594,9 +651,6 @@ public class TestHttpClient { @Test public void testSendSuccess() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .portName("input-running") @@ -628,11 +682,94 @@ public class TestHttpClient { } @Test - public void testSendSuccessCompressed() throws Exception { + public void testSendSuccessHTTPS() throws Exception { - final URI uri = server.getURI(); + try ( + SiteToSiteClient client = getDefaultBuilderHTTPS() + .portName("input-running") + .build() + ) { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + + assertNotNull(transaction); + + serverChecksum = "1071206772"; + + + for (int i = 0; i < 20; i++) { + DataPacket packet = new DataPacketBuilder() + .contents("Example contents from client.") + .attr("Client attr 1", "Client attr 1 value") + .attr("Client attr 2", "Client attr 2 value") + .build(); + transaction.send(packet); + long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten(); + logger.info("{}: {} bytes have been written.", i, written); + } + + transaction.confirm(); + + transaction.complete(); + } + + } + + private static void testSendLargeFile(SiteToSiteClient client) throws IOException { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + + assertNotNull(transaction); + + serverChecksum = "1527414060"; + + final int contentSize = 10_000; + final StringBuilder sb = new StringBuilder(contentSize); + for (int i = 0; i < contentSize; i++) { + sb.append("a"); + } + + DataPacket packet = new DataPacketBuilder() + .contents(sb.toString()) + .attr("Client attr 1", "Client attr 1 value") + .attr("Client attr 2", "Client attr 2 value") + .build(); + transaction.send(packet); + long written = ((Peer)transaction.getCommunicant()).getCommunicationsSession().getBytesWritten(); + logger.info("{} bytes have been written.", written); + + transaction.confirm(); + + transaction.complete(); + } + + @Test + public void testSendLargeFileHTTP() throws Exception { + + try ( + SiteToSiteClient client = getDefaultBuilder() + .portName("input-running") + .build() + ) { + testSendLargeFile(client); + } + + } + + @Test + public void testSendLargeFileHTTPS() throws Exception { + + try ( + SiteToSiteClient client = getDefaultBuilderHTTPS() + .portName("input-running") + .build() + ) { + testSendLargeFile(client); + } + + } + + @Test + public void testSendSuccessCompressed() throws Exception { - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .portName("input-running") @@ -667,9 +804,6 @@ public class TestHttpClient { @Test public void testSendSlowClientSuccess() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .idleExpiration(1000, TimeUnit.MILLISECONDS) @@ -722,9 +856,6 @@ public class TestHttpClient { @Test public void testSendTimeout() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .timeout(1, TimeUnit.SECONDS) @@ -761,9 +892,6 @@ public class TestHttpClient { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote.protocol.http.HttpClientTransaction", "INFO"); - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .idleExpiration(500, TimeUnit.MILLISECONDS) @@ -822,9 +950,6 @@ public class TestHttpClient { @Test public void testReceiveSuccess() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .portName("output-running") @@ -844,11 +969,29 @@ public class TestHttpClient { } @Test - public void testReceiveSuccessCompressed() throws Exception { + public void testReceiveSuccessHTTPS() throws Exception { - final URI uri = server.getURI(); + try ( + SiteToSiteClient client = getDefaultBuilderHTTPS() + .portName("output-running") + .build() + ) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + + assertNotNull(transaction); + + DataPacket packet; + while ((packet = transaction.receive()) != null) { + consumeDataPacket(packet); + } + transaction.confirm(); + transaction.complete(); + } + } + + @Test + public void testReceiveSuccessCompressed() throws Exception { - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .portName("output-running") @@ -871,9 +1014,6 @@ public class TestHttpClient { @Test public void testReceiveSlowClientSuccess() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .portName("output-running") @@ -896,9 +1036,6 @@ public class TestHttpClient { @Test public void testReceiveTimeout() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .timeout(1, TimeUnit.SECONDS) @@ -918,9 +1055,6 @@ public class TestHttpClient { @Test public void testReceiveTimeoutAfterDataExchange() throws Exception { - final URI uri = server.getURI(); - - logger.info("uri={}", uri); try ( SiteToSiteClient client = getDefaultBuilder() .timeout(1, TimeUnit.SECONDS) http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks new file mode 100755 index 0000000..df36197 Binary files /dev/null and b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/a9198444/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks new file mode 100755 index 0000000..7824378 Binary files /dev/null and b/nifi-commons/nifi-site-to-site-client/src/test/resources/certs/localhost-ts.jks differ
