Repository: nifi
Updated Branches:
  refs/heads/master 16e56ccfc -> a2f2ddd6b


NIFI-4699 Use a filter in PostHTTP to pull flowfiles from queue whose URL is 
the same

This closes #2412.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a2f2ddd6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a2f2ddd6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a2f2ddd6

Branch: refs/heads/master
Commit: a2f2ddd6b82ab611e34301e67acf30b30b5a5964
Parents: 16e56cc
Author: Mike Moser <[email protected]>
Authored: Tue Jan 9 15:56:54 2018 +0000
Committer: Mike Moser <[email protected]>
Committed: Thu Feb 8 18:36:11 2018 +0000

----------------------------------------------------------------------
 .../nifi/processors/standard/PostHTTP.java      | 226 +++++++++----------
 .../nifi/processors/standard/TestPostHTTP.java  | 118 ++++++++++
 2 files changed, 229 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a2f2ddd6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index d5344fa..00e51d2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -66,6 +66,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -76,8 +77,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.util.CertificateUtils;
 import org.apache.nifi.security.util.KeyStoreUtils;
 import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.GZIPOutputStream;
 import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
 import org.apache.nifi.stream.io.StreamThrottler;
@@ -95,6 +94,8 @@ import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response.Status;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -121,6 +122,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
@@ -449,6 +451,26 @@ public class PostHTTP extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile firstFlowFile = session.get();
+        if (firstFlowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+        final String url = 
context.getProperty(URL).evaluateAttributeExpressions(firstFlowFile).getValue();
+        try {
+            new java.net.URL(url);
+        } catch (final MalformedURLException e) {
+            logger.error("After substituting attribute values for {}, URL is 
{}; this is not a valid URL, so routing to failure",
+                    new Object[]{firstFlowFile, url});
+            firstFlowFile = session.penalize(firstFlowFile);
+            session.transfer(firstFlowFile, REL_FAILURE);
+            return;
+        }
+
+        final List<FlowFile> toSend = new ArrayList<>();
+        toSend.add(firstFlowFile);
+
         final boolean sendAsFlowFile = 
context.getProperty(SEND_AS_FLOWFILE).asBoolean();
         final int compressionLevel = 
context.getProperty(COMPRESSION_LEVEL).asInteger();
         final String userAgent = context.getProperty(USER_AGENT).getValue();
@@ -461,141 +483,115 @@ public class PostHTTP extends AbstractProcessor {
         final RequestConfig requestConfig = requestConfigBuilder.build();
 
         final StreamThrottler throttler = throttlerRef.get();
-        final ComponentLog logger = getLogger();
 
         final Double maxBatchBytes = 
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B);
-        String lastUrl = null;
-        long bytesToSend = 0L;
+        final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
 
-        final List<FlowFile> toSend = new ArrayList<>();
         DestinationAccepts destinationAccepts = null;
         CloseableHttpClient client = null;
         final String transactionId = UUID.randomUUID().toString();
 
         final AtomicReference<String> dnHolder = new AtomicReference<>("none");
-        while (true) {
-            FlowFile flowFile = session.get();
-            if (flowFile == null) {
-                break;
-            }
 
-            final String url = 
context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue();
-            try {
-                new java.net.URL(url);
-            } catch (final MalformedURLException e) {
-                logger.error("After substituting attribute values for {}, URL 
is {}; this is not a valid URL, so routing to failure",
-                        new Object[]{flowFile, url});
-                flowFile = session.penalize(flowFile);
-                session.transfer(flowFile, REL_FAILURE);
-                continue;
-            }
-
-            // If this FlowFile doesn't have the same url, throw it back on 
the queue and stop grabbing FlowFiles
-            if (lastUrl != null && !lastUrl.equals(url)) {
-                session.transfer(flowFile);
-                break;
-            }
+        final Config config = getConfig(url, context);
+        final HttpClientConnectionManager conMan = 
config.getConnectionManager();
 
-            lastUrl = url;
-            toSend.add(flowFile);
-
-            if (client == null || destinationAccepts == null) {
-                final Config config = getConfig(url, context);
-                final HttpClientConnectionManager conMan = 
config.getConnectionManager();
-
-                final HttpClientBuilder clientBuilder = 
HttpClientBuilder.create();
-                clientBuilder.setConnectionManager(conMan);
-                clientBuilder.setUserAgent(userAgent);
-                clientBuilder.addInterceptorFirst(new 
HttpResponseInterceptor() {
-                    @Override
-                    public void process(final HttpResponse response, final 
HttpContext httpContext) throws HttpException, IOException {
-                        final HttpCoreContext coreContext = 
HttpCoreContext.adapt(httpContext);
-                        final ManagedHttpClientConnection conn = 
coreContext.getConnection(ManagedHttpClientConnection.class);
-                        if (!conn.isOpen()) {
-                            return;
-                        }
-
-                        final SSLSession sslSession = conn.getSSLSession();
-
-                        if (sslSession != null) {
-                            final Certificate[] certChain = 
sslSession.getPeerCertificates();
-                            if (certChain == null || certChain.length == 0) {
-                                throw new SSLPeerUnverifiedException("No 
certificates found");
-                            }
-
-                            try {
-                                final X509Certificate cert = 
CertificateUtils.convertAbstractX509Certificate(certChain[0]);
-                                
dnHolder.set(cert.getSubjectDN().getName().trim());
-                            } catch (CertificateException e) {
-                                final String msg = "Could not extract subject 
DN from SSL session peer certificate";
-                                logger.warn(msg);
-                                throw new SSLPeerUnverifiedException(msg);
-                            }
-                        }
-                    }
-                });
-
-                clientBuilder.disableAutomaticRetries();
-                clientBuilder.disableContentCompression();
-
-                final String username = 
context.getProperty(USERNAME).getValue();
-                final String password = 
context.getProperty(PASSWORD).getValue();
-                // set the credentials if appropriate
-                if (username != null) {
-                    final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-                    if (password == null) {
-                        credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username));
-                    } else {
-                        credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username, password));
-                    }
-                    
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+        final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+        clientBuilder.setConnectionManager(conMan);
+        clientBuilder.setUserAgent(userAgent);
+        clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
+            @Override
+            public void process(final HttpResponse response, final HttpContext 
httpContext) throws HttpException, IOException {
+                final HttpCoreContext coreContext = 
HttpCoreContext.adapt(httpContext);
+                final ManagedHttpClientConnection conn = 
coreContext.getConnection(ManagedHttpClientConnection.class);
+                if (!conn.isOpen()) {
+                    return;
                 }
 
-                // Set the proxy if specified
-                if (context.getProperty(PROXY_HOST).isSet() && 
context.getProperty(PROXY_PORT).isSet()) {
-                    final String host = 
context.getProperty(PROXY_HOST).getValue();
-                    final int port = 
context.getProperty(PROXY_PORT).asInteger();
-                    clientBuilder.setProxy(new HttpHost(host, port));
-                }
+                final SSLSession sslSession = conn.getSSLSession();
 
-                client = clientBuilder.build();
+                if (sslSession != null) {
+                    final Certificate[] certChain = 
sslSession.getPeerCertificates();
+                    if (certChain == null || certChain.length == 0) {
+                        throw new SSLPeerUnverifiedException("No certificates 
found");
+                    }
 
-                // determine whether or not destination accepts flowfile/gzip
-                destinationAccepts = config.getDestinationAccepts();
-                if (destinationAccepts == null) {
                     try {
-                        destinationAccepts = 
getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), 
transactionId);
-                        config.setDestinationAccepts(destinationAccepts);
-                    } catch (final IOException e) {
-                        flowFile = session.penalize(flowFile);
-                        session.transfer(flowFile, REL_FAILURE);
-                        logger.error("Unable to communicate with destination 
{} to determine whether or not it can accept "
-                                + "flowfiles/gzip; routing {} to failure due 
to {}", new Object[]{url, flowFile, e});
-                        context.yield();
-                        return;
+                        final X509Certificate cert = 
CertificateUtils.convertAbstractX509Certificate(certChain[0]);
+                        dnHolder.set(cert.getSubjectDN().getName().trim());
+                    } catch (CertificateException e) {
+                        final String msg = "Could not extract subject DN from 
SSL session peer certificate";
+                        logger.warn(msg);
+                        throw new SSLPeerUnverifiedException(msg);
                     }
                 }
             }
-
-            bytesToSend += flowFile.getSize();
-            if (bytesToSend > maxBatchBytes.longValue()) {
-                break;
+        });
+
+        clientBuilder.disableAutomaticRetries();
+        clientBuilder.disableContentCompression();
+
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+        // set the credentials if appropriate
+        if (username != null) {
+            final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+            if (password == null) {
+                credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username));
+            } else {
+                credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username, password));
             }
+            clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+        }
 
-            // if we are not sending as flowfile, or if the destination 
doesn't accept V3 or V2 (streaming) format,
-            // then only use a single FlowFile
-            if (!sendAsFlowFile || !destinationAccepts.isFlowFileV3Accepted() 
&& !destinationAccepts.isFlowFileV2Accepted()) {
-                break;
+        // Set the proxy if specified
+        if (context.getProperty(PROXY_HOST).isSet() && 
context.getProperty(PROXY_PORT).isSet()) {
+            final String host = context.getProperty(PROXY_HOST).getValue();
+            final int port = context.getProperty(PROXY_PORT).asInteger();
+            clientBuilder.setProxy(new HttpHost(host, port));
+        }
+
+        client = clientBuilder.build();
+
+        // determine whether or not destination accepts flowfile/gzip
+        destinationAccepts = config.getDestinationAccepts();
+        if (destinationAccepts == null) {
+            try {
+                destinationAccepts = getDestinationAcceptance(sendAsFlowFile, 
client, url, getLogger(), transactionId);
+                config.setDestinationAccepts(destinationAccepts);
+            } catch (final IOException e) {
+                firstFlowFile = session.penalize(firstFlowFile);
+                session.transfer(firstFlowFile, REL_FAILURE);
+                logger.error("Unable to communicate with destination {} to 
determine whether or not it can accept "
+                        + "flowfiles/gzip; routing {} to failure due to {}", 
new Object[]{url, firstFlowFile, e});
+                context.yield();
+                return;
             }
         }
 
-        if (toSend.isEmpty()) {
-            return;
+        // if we are sending as flowfile and the destination accepts V3 or V2 
(streaming) format,
+        // then we can get more flowfiles from the session up to 
MAX_BATCH_SIZE for the same URL
+        if (sendAsFlowFile && (destinationAccepts.isFlowFileV3Accepted() || 
destinationAccepts.isFlowFileV2Accepted())) {
+            toSend.addAll(session.get(new FlowFileFilter() {
+                @Override
+                public FlowFileFilterResult filter(FlowFile flowFile) {
+                    // if over MAX_BATCH_SIZE, then stop adding files
+                    if (bytesToSend.get() + flowFile.getSize() > 
maxBatchBytes) {
+                        return FlowFileFilterResult.REJECT_AND_TERMINATE;
+                    }
+                    // check URL to see if this flowfile can be included in 
the batch
+                    final String urlToCheck = 
context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue();
+                    if (url.equals(urlToCheck)) {
+                        bytesToSend.addAndGet(flowFile.getSize());
+                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    } else {
+                        return FlowFileFilterResult.REJECT_AND_CONTINUE;
+                    }
+                }
+            }));
         }
 
-        final String url = lastUrl;
         final HttpPost post = new HttpPost(url);
-        final List<FlowFile> flowFileList = toSend;
         final DestinationAccepts accepts = destinationAccepts;
         final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() 
== null;
 
@@ -609,7 +605,7 @@ public class PostHTTP extends AbstractProcessor {
                 }
 
                 try (final OutputStream out = wrappedOut) {
-                    for (final FlowFile flowFile : flowFileList) {
+                    for (final FlowFile flowFile : toSend) {
                         session.read(flowFile, new InputStreamCallback() {
                             @Override
                             public void process(final InputStream rawIn) 
throws IOException {
@@ -693,10 +689,10 @@ public class PostHTTP extends AbstractProcessor {
         }
 
         final String attributeHeaderRegex = 
context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
-        if (attributeHeaderRegex != null && !sendAsFlowFile && 
flowFileList.size() == 1) {
+        if (attributeHeaderRegex != null && !sendAsFlowFile && toSend.size() 
== 1) {
             final Pattern pattern = Pattern.compile(attributeHeaderRegex);
 
-            final Map<String, String> attributes = 
flowFileList.get(0).getAttributes();
+            final Map<String, String> attributes = 
toSend.get(0).getAttributes();
             for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
                 final String key = entry.getKey();
                 if (pattern.matcher(key).matches()) {
@@ -731,7 +727,7 @@ public class PostHTTP extends AbstractProcessor {
             // don't do this, the Connection will not be returned to the pool
             EntityUtils.consume(response.getEntity());
             stopWatch.stop();
-            uploadDataRate = stopWatch.calculateDataRate(bytesToSend);
+            uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
             uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
         } catch (final IOException e) {
             logger.error("Failed to Post {} due to {}; transferring to 
failure", new Object[]{flowFileDescription, e});

http://git-wip-us.apache.org/repos/asf/nifi/blob/a2f2ddd6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
index 9a46741..abb1951 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java
@@ -17,18 +17,24 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.FlowFileUnpackagerV3;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.eclipse.jetty.servlet.ServletHandler;
@@ -441,4 +447,116 @@ public class TestPostHTTP {
         
Assert.assertTrue(runner.getProcessContext().getProperty(PostHTTP.USER_AGENT).getValue().startsWith("Apache-HttpClient"));
     }
 
+    @Test
+    public void testBatchWithMultipleUrls() throws Exception {
+        CaptureServlet servletA, servletB;
+        TestServer serverA, serverB;
+
+        { // setup test servers
+            setup(null);
+            servletA = servlet;
+            serverA = server;
+
+            // set up second web service
+            ServletHandler handler = new ServletHandler();
+            handler.addServletWithMapping(CaptureServlet.class, "/*");
+
+            // create the second service
+            serverB = new TestServer(null);
+            serverB.addHandler(handler);
+            serverB.startServer();
+
+            servletB = (CaptureServlet) handler.getServlets()[0].getServlet();
+        }
+
+        runner.setProperty(PostHTTP.URL, "${url}"); // use EL for the URL
+        runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true");
+        runner.setProperty(PostHTTP.MAX_BATCH_SIZE, "10 b");
+
+        Set<String> expectedContentA = new HashSet<>();
+        Set<String> expectedContentB = new HashSet<>();
+
+        Set<String> actualContentA = new HashSet<>();
+        Set<String> actualContentB = new HashSet<>();
+
+        // enqueue 9 FlowFiles
+        for (int i = 0; i < 9; i++) {
+            enqueueWithURL("a" + i, serverA.getUrl());
+            enqueueWithURL("b" + i, serverB.getUrl());
+
+            expectedContentA.add("a" + i);
+            expectedContentB.add("b" + i);
+        }
+
+        // MAX_BATCH_SIZE is 10 bytes, each file is 2 bytes, so 18 files 
should produce 4 batches
+        for (int i = 0; i < 4; i++) {
+            runner.run(1);
+            runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS);
+            assertFalse(successFiles.isEmpty());
+
+            MockFlowFile mff = successFiles.get(0);
+            final String urlAttr = mff.getAttribute("url");
+
+            if (serverA.getUrl().equals(urlAttr)) {
+                checkBatch(serverA, servletA, actualContentA, 
(actualContentA.isEmpty() ? 5 : 4));
+            } else if (serverB.getUrl().equals(urlAttr)) {
+                checkBatch(serverB, servletB, actualContentB, 
(actualContentB.isEmpty() ? 5 : 4));
+            } else {
+                fail("unexpected url attribute");
+            }
+        }
+
+        assertEquals(expectedContentA, actualContentA);
+        assertEquals(expectedContentB, actualContentB);
+
+        // make sure everything transferred, nothing more to do
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 0);
+    }
+
+    private void enqueueWithURL(String data, String url) {
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("url", url);
+        runner.enqueue(data.getBytes(), attrs);
+    }
+
+    private void checkBatch(TestServer server, CaptureServlet servlet, 
Set<String> actualContent, int expectedCount) throws Exception {
+        FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3();
+        Set<String> actualFFContent = new HashSet<>();
+        Set<String> actualPostContent = new HashSet<>();
+
+        runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 
expectedCount);
+
+        // confirm that all FlowFiles transferred to 'success' have the same 
URL
+        // also accumulate content to verify later
+        final List<MockFlowFile> successFlowFiles = 
runner.getFlowFilesForRelationship(PostHTTP.REL_SUCCESS);
+        for (int i = 0; i < expectedCount; i++) {
+            MockFlowFile mff = successFlowFiles.get(i);
+            mff.assertAttributeEquals("url", server.getUrl());
+            String content = new String(mff.toByteArray());
+            actualFFContent.add(content);
+        }
+
+        // confirm that all FlowFiles POSTed to server have the same URL
+        // also accumulate content to verify later
+        try (ByteArrayInputStream bais = new 
ByteArrayInputStream(servlet.getLastPost());
+            ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            for (int i = 0; i < expectedCount; i++) {
+                Map<String, String> receivedAttrs = 
unpacker.unpackageFlowFile(bais, baos);
+                String receivedContent = new String(baos.toByteArray());
+                actualPostContent.add(receivedContent);
+                assertEquals(server.getUrl(), receivedAttrs.get("url"));
+                assertTrue(unpacker.hasMoreData() || i == (expectedCount - 1));
+                baos.reset();
+            }
+        }
+
+        // confirm that the transferred and POSTed content match
+        assertEquals(actualFFContent, actualPostContent);
+
+        // accumulate actial content
+        actualContent.addAll(actualPostContent);
+        runner.clearTransferState();
+    }
 }

Reply via email to