Repository: nifi Updated Branches: refs/heads/NIFI-108 6d64f58d4 -> de264b1c7
NIFI-108: - Verifying two phase commit for queue listing. - Fixing checkstyle. - Ensuring drop and listing requests are merged when created when clustered. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/de264b1c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/de264b1c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/de264b1c Branch: refs/heads/NIFI-108 Commit: de264b1c741e6269cfcf456934bdae93ea4102f5 Parents: 6d64f58 Author: Matt Gilman <[email protected]> Authored: Thu Dec 17 18:07:00 2015 -0500 Committer: Matt Gilman <[email protected]> Committed: Thu Dec 17 18:07:00 2015 -0500 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 15 +++++++-- .../controller/TestStandardFlowFileQueue.java | 34 +++++++++----------- .../org/apache/nifi/web/NiFiServiceFacade.java | 8 +++++ .../nifi/web/StandardNiFiServiceFacade.java | 6 ++++ .../apache/nifi/web/api/ConnectionResource.java | 16 ++++++++- .../org/apache/nifi/web/dao/ConnectionDAO.java | 8 +++++ .../web/dao/impl/StandardConnectionDAO.java | 7 ++++ 7 files changed, 72 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 6cd95b8..3282318 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -329,8 +329,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); + public static final Pattern DROP_REQUESTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests"); public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}"); - public static final Pattern LIST_FLOWFILES_URI = Pattern + public static final Pattern LISTING_REQUESTS_URI = Pattern + .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests"); + public static final Pattern LISTING_REQUEST_URI = Pattern .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}"); private final NiFiProperties properties; @@ -2443,7 +2446,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private static boolean isListFlowFilesEndpoint(final URI uri, final String method) { - return "GET".equalsIgnoreCase(method) && LIST_FLOWFILES_URI.matcher(uri.getPath()).matches(); + if ("GET".equalsIgnoreCase(method) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) { + return true; + } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) { + return true; + } + + return false; } private static boolean isCountersEndpoint(final URI uri) { @@ -2491,6 +2500,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return true; } else if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) { return true; + } else if (("POST".equalsIgnoreCase(method) && DROP_REQUESTS_URI.matcher(uri.getPath()).matches())) { + return true; } return false; http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index f58d4b0..587c62f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -17,23 +17,6 @@ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileState; @@ -63,9 +46,22 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.sun.istack.logging.Logger; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; -import ch.qos.logback.classic.BasicConfigurator; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 66efb27..fe67473 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -496,6 +496,14 @@ public interface NiFiServiceFacade { ConfigurationSnapshot<ConnectionDTO> createConnection(Revision revision, String groupId, ConnectionDTO connectionDTO); /** + * Determines if this connection can be listed. + * + * @param groupId group + * @param connectionId connection + */ + void verifyListQueue(String groupId, String connectionId); + + /** * Determines if this connection can be created. * * @param groupId group http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 7982f9a..abfd5b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -219,6 +219,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // ----------------------------------------- // Verification Operations // ----------------------------------------- + + @Override + public void verifyListQueue(String groupId, String connectionId) { + connectionDAO.verifyList(groupId, connectionId); + } + @Override public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) { connectionDAO.verifyCreate(groupId, connectionDTO); http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 76dde1a..1415846 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -1031,6 +1031,7 @@ public class ConnectionResource extends ApplicationResource { @Path("/{connection-id}/flowfiles/{flowfile-uuid}") @PreAuthorize("hasRole('ROLE_DFM')") public Response deleteFlowFile( + @Context HttpServletRequest httpServletRequest, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false @@ -1072,6 +1073,12 @@ public class ConnectionResource extends ApplicationResource { } } + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + return generateContinueResponse().build(); + } + return null; } @@ -1255,7 +1262,7 @@ public class ConnectionResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response getFlowFileListing( + public Response createFlowFileListing( @Context HttpServletRequest httpServletRequest, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", @@ -1273,6 +1280,13 @@ public class ConnectionResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + serviceFacade.verifyListQueue(groupId, id); + return generateContinueResponse().build(); + } + // ensure the id is the same across the cluster final String listingRequestId; final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index 3884f51..85cd9b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -121,6 +121,14 @@ public interface ConnectionDAO { ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId); /** + * Verifies the listing can be processed. + * + * @param groupId group id + * @param id connection id + */ + void verifyList(String groupId, String id); + + /** * Verifies the create request can be processed. * * @param groupId group id http://git-wip-us.apache.org/repos/asf/nifi/blob/de264b1c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 4d5d8e2..dca1c18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -391,6 +391,13 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override + public void verifyList(String groupId, String id) { + final Connection connection = locateConnection(groupId, id); + final FlowFileQueue queue = connection.getFlowFileQueue(); + queue.verifyCanList(); + } + + @Override public void verifyUpdate(String groupId, ConnectionDTO connectionDTO) { final ProcessGroup group = locateProcessGroup(flowController, groupId); verifyUpdate(locateConnection(group, connectionDTO.getId()), connectionDTO);
