Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 6d64f58d4 -> de264b1c7


NIFI-108:
- Verifying two phase commit for queue listing.
- Fixing checkstyle.
- Ensuring drop and listing requests are merged when created when clustered.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/de264b1c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/de264b1c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/de264b1c

Branch: refs/heads/NIFI-108
Commit: de264b1c741e6269cfcf456934bdae93ea4102f5
Parents: 6d64f58
Author: Matt Gilman <[email protected]>
Authored: Thu Dec 17 18:07:00 2015 -0500
Committer: Matt Gilman <[email protected]>
Committed: Thu Dec 17 18:07:00 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 15 +++++++--
 .../controller/TestStandardFlowFileQueue.java   | 34 +++++++++-----------
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  8 +++++
 .../nifi/web/StandardNiFiServiceFacade.java     |  6 ++++
 .../apache/nifi/web/api/ConnectionResource.java | 16 ++++++++-
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  8 +++++
 .../web/dao/impl/StandardConnectionDAO.java     |  7 ++++
 7 files changed, 72 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 6cd95b8..3282318 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -329,8 +329,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
+    public static final Pattern DROP_REQUESTS_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests");
     public static final Pattern DROP_REQUEST_URI = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
-    public static final Pattern LIST_FLOWFILES_URI = Pattern
+    public static final Pattern LISTING_REQUESTS_URI = Pattern
+        
.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests");
+    public static final Pattern LISTING_REQUEST_URI = Pattern
         
.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
 
     private final NiFiProperties properties;
@@ -2443,7 +2446,13 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     }
 
     private static boolean isListFlowFilesEndpoint(final URI uri, final String 
method) {
-        return "GET".equalsIgnoreCase(method) && 
LIST_FLOWFILES_URI.matcher(uri.getPath()).matches();
+        if ("GET".equalsIgnoreCase(method) && 
LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if ("POST".equalsIgnoreCase(method) && 
LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
     }
 
     private static boolean isCountersEndpoint(final URI uri) {
@@ -2491,6 +2500,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             return true;
         } else if (("GET".equalsIgnoreCase(method) || 
"DELETE".equalsIgnoreCase(method)) && 
DROP_REQUEST_URI.matcher(uri.getPath()).matches()) {
             return true;
+        } else if (("POST".equalsIgnoreCase(method) && 
DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) {
+            return true;
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index f58d4b0..587c62f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -17,23 +17,6 @@
 
 package org.apache.nifi.controller;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
@@ -63,9 +46,22 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.sun.istack.logging.Logger;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
-import ch.qos.logback.classic.BasicConfigurator;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestStandardFlowFileQueue {
     private TestSwapManager swapManager = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 66efb27..fe67473 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -496,6 +496,14 @@ public interface NiFiServiceFacade {
     ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, 
String groupId, ConnectionDTO connectionDTO);
 
     /**
+     * Determines if this connection can be listed.
+     *
+     * @param groupId group
+     * @param connectionId connection
+     */
+    void verifyListQueue(String groupId, String connectionId);
+
+    /**
      * Determines if this connection can be created.
      *
      * @param groupId group

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 7982f9a..abfd5b8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -219,6 +219,12 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     // -----------------------------------------
     // Verification Operations
     // -----------------------------------------
+
+    @Override
+    public void verifyListQueue(String groupId, String connectionId) {
+        connectionDAO.verifyList(groupId, connectionId);
+    }
+
     @Override
     public void verifyCreateConnection(String groupId, ConnectionDTO 
connectionDTO) {
         connectionDAO.verifyCreate(groupId, connectionDTO);

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 76dde1a..1415846 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -1031,6 +1031,7 @@ public class ConnectionResource extends 
ApplicationResource {
     @Path("/{connection-id}/flowfiles/{flowfile-uuid}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     public Response deleteFlowFile(
+            @Context HttpServletRequest httpServletRequest,
             @ApiParam(
                 value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
                 required = false
@@ -1072,6 +1073,12 @@ public class ConnectionResource extends 
ApplicationResource {
             }
         }
 
+        // handle expects request (usually from the cluster manager)
+        final String expects = 
httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
         return null;
     }
 
@@ -1255,7 +1262,7 @@ public class ConnectionResource extends 
ApplicationResource {
             @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
         }
     )
-    public Response getFlowFileListing(
+    public Response createFlowFileListing(
             @Context HttpServletRequest httpServletRequest,
             @ApiParam(
                 value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
@@ -1273,6 +1280,13 @@ public class ConnectionResource extends 
ApplicationResource {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // handle expects request (usually from the cluster manager)
+        final String expects = 
httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            serviceFacade.verifyListQueue(groupId, id);
+            return generateContinueResponse().build();
+        }
+
         // ensure the id is the same across the cluster
         final String listingRequestId;
         final ClusterContext clusterContext = 
ClusterContextThreadLocal.getContext();

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index 3884f51..85cd9b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -121,6 +121,14 @@ public interface ConnectionDAO {
     ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, 
String listingRequestId);
 
     /**
+     * Verifies the listing can be processed.
+     *
+     * @param groupId group id
+     * @param id connection id
+     */
+    void verifyList(String groupId, String id);
+
+    /**
      * Verifies the create request can be processed.
      *
      * @param groupId group id

http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 4d5d8e2..dca1c18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -391,6 +391,13 @@ public class StandardConnectionDAO extends ComponentDAO 
implements ConnectionDAO
     }
 
     @Override
+    public void verifyList(String groupId, String id) {
+        final Connection connection = locateConnection(groupId, id);
+        final FlowFileQueue queue = connection.getFlowFileQueue();
+        queue.verifyCanList();
+    }
+
+    @Override
     public void verifyUpdate(String groupId, ConnectionDTO connectionDTO) {
         final ProcessGroup group = locateProcessGroup(flowController, groupId);
         verifyUpdate(locateConnection(group, connectionDTO.getId()), 
connectionDTO);

Reply via email to