http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java ---------------------------------------------------------------------- diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java new file mode 100644 index 0000000..8762706 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java @@ -0,0 +1,936 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.service; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.exception.ResourceNotFoundException; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.metadata.BucketMetadata; +import org.apache.nifi.registry.metadata.FlowMetadata; +import org.apache.nifi.registry.metadata.FlowSnapshotMetadata; +import org.apache.nifi.registry.metadata.MetadataProvider; +import org.apache.nifi.registry.metadata.StandardBucketMetadata; +import org.apache.nifi.registry.metadata.StandardFlowMetadata; +import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata; +import org.apache.nifi.registry.serialization.FlowSnapshotSerializer; +import org.apache.nifi.registry.serialization.Serializer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.validation.ConstraintViolationException; +import javax.validation.Validation; +import javax.validation.Validator; +import javax.validation.ValidatorFactory; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestRegistryService { + + private MetadataProvider metadataProvider; + private FlowPersistenceProvider flowPersistenceProvider; + private Serializer<VersionedFlowSnapshot> snapshotSerializer; + private Validator validator; + + private RegistryService registryService; + + @Before + public void setup() { + metadataProvider = mock(MetadataProvider.class); + flowPersistenceProvider = mock(FlowPersistenceProvider.class); + snapshotSerializer = mock(FlowSnapshotSerializer.class); + + final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); + validator = validatorFactory.getValidator(); + + registryService = new RegistryService(metadataProvider, flowPersistenceProvider, snapshotSerializer, validator); + } + + // ---------------------- Test Bucket methods --------------------------------------------- + + @Test + public void testCreateBucketValid() { + final Bucket bucket = new Bucket(); + bucket.setName("My Bucket"); + bucket.setDescription("This is my bucket."); + + when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(null); + + doAnswer(createBucketAnswer()).when(metadataProvider).createBucket(any(BucketMetadata.class)); + + final Bucket createdBucket = registryService.createBucket(bucket); + assertNotNull(createdBucket); + assertNotNull(createdBucket.getIdentifier()); + assertNotNull(createdBucket.getCreatedTimestamp()); + + assertEquals(bucket.getName(), createdBucket.getName()); + assertEquals(bucket.getDescription(), createdBucket.getDescription()); + } + + @Test(expected = IllegalStateException.class) + public void testCreateBucketWithSameName() { + final Bucket bucket = new Bucket(); + bucket.setName("My Bucket"); + bucket.setDescription("This is my bucket."); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(existingBucket); + + // should throw exception since a bucket with the same name exists + registryService.createBucket(bucket); + } + + @Test(expected = ConstraintViolationException.class) + public void testCreateBucketWithMissingName() { + final Bucket bucket = new Bucket(); + when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(null); + registryService.createBucket(bucket); + } + + @Test + public void testGetExistingBucket() { + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + final Bucket bucket = registryService.getBucket(existingBucket.getIdentifier()); + assertNotNull(bucket); + assertEquals(existingBucket.getIdentifier(), bucket.getIdentifier()); + assertEquals(existingBucket.getName(), bucket.getName()); + assertEquals(existingBucket.getDescription(), bucket.getDescription()); + assertEquals(existingBucket.getCreatedTimestamp(), bucket.getCreatedTimestamp()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testGetBucketDoesNotExist() { + when(metadataProvider.getBucketById(any(String.class))).thenReturn(null); + registryService.getBucket("does-not-exist"); + } + + @Test(expected = IllegalArgumentException.class) + public void testUpdateBucketWithoutId() { + final Bucket bucket = new Bucket(); + bucket.setName("My Bucket"); + bucket.setDescription("This is my bucket."); + registryService.updateBucket(bucket); + } + + @Test(expected = ResourceNotFoundException.class) + public void testUpdateBucketDoesNotExist() { + final Bucket bucket = new Bucket(); + bucket.setIdentifier("b1"); + bucket.setName("My Bucket"); + bucket.setDescription("This is my bucket."); + registryService.updateBucket(bucket); + + when(metadataProvider.getBucketById(any(String.class))).thenReturn(null); + registryService.updateBucket(bucket); + } + + @Test(expected = IllegalStateException.class) + public void testUpdateBucketWithSameNameAsExistingBucket() { + final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate); + + final BucketMetadata otherBucket = new StandardBucketMetadata.Builder() + .identifier("b2") + .name("My Bucket #2") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketByName(otherBucket.getName())).thenReturn(otherBucket); + + // should fail because other bucket has the same name + final Bucket updatedBucket = new Bucket(); + updatedBucket.setIdentifier(bucketToUpdate.getIdentifier()); + updatedBucket.setName("My Bucket #2"); + updatedBucket.setDescription(bucketToUpdate.getDescription()); + + registryService.updateBucket(updatedBucket); + } + + @Test + public void testUpdateBucket() { + final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate); + + doAnswer(updateBucketAnswer()).when(metadataProvider).updateBucket(any(BucketMetadata.class)); + + final Bucket updatedBucket = new Bucket(); + updatedBucket.setIdentifier(bucketToUpdate.getIdentifier()); + updatedBucket.setName("Updated Name"); + updatedBucket.setDescription("Updated Description"); + + final Bucket result = registryService.updateBucket(updatedBucket); + assertNotNull(result); + assertEquals(updatedBucket.getName(), result.getName()); + assertEquals(updatedBucket.getDescription(), result.getDescription()); + } + + @Test + public void testUpdateBucketPartial() { + final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate); + + doAnswer(updateBucketAnswer()).when(metadataProvider).updateBucket(any(BucketMetadata.class)); + + final Bucket updatedBucket = new Bucket(); + updatedBucket.setIdentifier(bucketToUpdate.getIdentifier()); + updatedBucket.setName("Updated Name"); + updatedBucket.setDescription(null); + + // name should be updated but description should not be changed + final Bucket result = registryService.updateBucket(updatedBucket); + assertNotNull(result); + assertEquals(updatedBucket.getName(), result.getName()); + assertEquals(bucketToUpdate.getDescription(), result.getDescription()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDeleteBucketDoesNotExist() { + final String bucketId = "b1"; + when(metadataProvider.getBucketById(bucketId)).thenReturn(null); + registryService.deleteBucket(bucketId); + } + + @Test + public void testDeleteBucketWithFlows() { + final BucketMetadata bucketToDelete = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(bucketToDelete.getIdentifier())).thenReturn(bucketToDelete); + + final FlowMetadata flowToDelete = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("Flow 1") + .description("This is flow 1") + .created(System.currentTimeMillis()) + .build(); + + final Set<FlowMetadata> flowsToDelete = new HashSet<>(); + flowsToDelete.add(flowToDelete); + + when(metadataProvider.getFlows(bucketToDelete.getIdentifier())).thenReturn(flowsToDelete); + + final Bucket deletedBucket = registryService.deleteBucket(bucketToDelete.getIdentifier()); + assertNotNull(deletedBucket); + assertEquals(bucketToDelete.getIdentifier(), deletedBucket.getIdentifier()); + + verify(flowPersistenceProvider, times(1)) + .deleteSnapshots(eq(bucketToDelete.getIdentifier()), eq(flowToDelete.getIdentifier())); + } + + // ---------------------- Test VersionedFlow methods --------------------------------------------- + + @Test(expected = ConstraintViolationException.class) + public void testCreateFlowInvalid() { + final VersionedFlow versionedFlow = new VersionedFlow(); + registryService.createFlow("b1", versionedFlow); + } + + @Test(expected = ResourceNotFoundException.class) + public void testCreateFlowBucketDoesNotExist() { + + when(metadataProvider.getBucketById(any(String.class))).thenReturn(null); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName("My Flow"); + versionedFlow.setBucketIdentifier("b1"); + + registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow); + } + + @Test(expected = IllegalStateException.class) + public void testCreateFlowWithSameName() { + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + // setup a flow with the same name that already exists + + final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowByName(flowMetadata.getName())).thenReturn(flowMetadata); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName(flowMetadata.getName()); + versionedFlow.setBucketIdentifier("b1"); + + registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow); + } + + @Test + public void testCreateFlowValid() { + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName("My Flow"); + versionedFlow.setBucketIdentifier("b1"); + + doAnswer(createFlowAnswer()).when(metadataProvider).createFlow(any(String.class), any(FlowMetadata.class)); + + final VersionedFlow createdFlow = registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow); + assertNotNull(createdFlow); + assertNotNull(createdFlow.getIdentifier()); + assertTrue(createdFlow.getCreatedTimestamp() > 0); + assertTrue(createdFlow.getModifiedTimestamp() > 0); + assertEquals(versionedFlow.getName(), createdFlow.getName()); + assertEquals(versionedFlow.getBucketIdentifier(), createdFlow.getBucketIdentifier()); + assertEquals(versionedFlow.getDescription(), createdFlow.getDescription()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testGetFlowDoesNotExist() { + when(metadataProvider.getFlowById(any(String.class))).thenReturn(null); + registryService.getFlow("flow1"); + } + + @Test + public void testGetFlowExists() { + final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(flowMetadata.getIdentifier())).thenReturn(flowMetadata); + + final VersionedFlow versionedFlow = registryService.getFlow(flowMetadata.getIdentifier()); + assertNotNull(versionedFlow); + assertEquals(flowMetadata.getIdentifier(), versionedFlow.getIdentifier()); + assertEquals(flowMetadata.getName(), versionedFlow.getName()); + assertEquals(flowMetadata.getDescription(), versionedFlow.getDescription()); + assertEquals(flowMetadata.getBucketIdentifier(), versionedFlow.getBucketIdentifier()); + assertEquals(flowMetadata.getCreatedTimestamp(), versionedFlow.getCreatedTimestamp()); + assertEquals(flowMetadata.getModifiedTimestamp(), versionedFlow.getModifiedTimestamp()); + } + + @Test + public void testGetFlows() { + final FlowMetadata flowMetadata1 = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + final FlowMetadata flowMetadata2 = new StandardFlowMetadata.Builder() + .identifier("flow2") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + final Set<FlowMetadata> flows = new LinkedHashSet<>(); + flows.add(flowMetadata1); + flows.add(flowMetadata2); + + when(metadataProvider.getFlows()).thenReturn(flows); + + final Set<VersionedFlow> allFlows = registryService.getFlows(); + assertNotNull(allFlows); + assertEquals(2, allFlows.size()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testGetFlowsByBucketDoesNotExist() { + when(metadataProvider.getBucketById(any(String.class))).thenReturn(null); + registryService.getFlows("b1"); + } + + @Test + public void testGetFlowsByBucketExists() { + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier("b1") + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + final FlowMetadata flowMetadata1 = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + final FlowMetadata flowMetadata2 = new StandardFlowMetadata.Builder() + .identifier("flow2") + .name("My Flow") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + final Set<FlowMetadata> flows = new LinkedHashSet<>(); + flows.add(flowMetadata1); + flows.add(flowMetadata2); + + when(metadataProvider.getFlows(existingBucket.getIdentifier())).thenReturn(flows); + + final Set<VersionedFlow> allFlows = registryService.getFlows(existingBucket.getIdentifier()); + assertNotNull(allFlows); + assertEquals(2, allFlows.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testUpdateFlowWithoutId() { + final VersionedFlow versionedFlow = new VersionedFlow(); + registryService.updateFlow(versionedFlow); + } + + @Test(expected = ResourceNotFoundException.class) + public void testUpdateFlowDoesNotExist() { + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier("flow1"); + + when(metadataProvider.getFlowById(versionedFlow.getIdentifier())).thenReturn(null); + + registryService.updateFlow(versionedFlow); + } + + @Test(expected = IllegalStateException.class) + public void testUpdateFlowWithSameNameAsExistingFlow() { + final FlowMetadata flowToUpdate = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(flowToUpdate.getIdentifier())).thenReturn(flowToUpdate); + + final FlowMetadata otherFlow = new StandardFlowMetadata.Builder() + .identifier("flow2") + .name("My Flow 2") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowByName(otherFlow.getName())).thenReturn(otherFlow); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier(flowToUpdate.getIdentifier()); + versionedFlow.setName(otherFlow.getName()); + + registryService.updateFlow(versionedFlow); + } + + @Test + public void testUpdateFlow() throws InterruptedException { + final FlowMetadata flowToUpdate = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(flowToUpdate.getIdentifier())).thenReturn(flowToUpdate); + when(metadataProvider.getFlowByName(flowToUpdate.getName())).thenReturn(flowToUpdate); + + doAnswer(updateFlowAnswer()).when(metadataProvider).updateFlow(any(FlowMetadata.class)); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier(flowToUpdate.getIdentifier()); + versionedFlow.setName("New Flow Name"); + versionedFlow.setDescription("This is a new description"); + + Thread.sleep(10); + + final VersionedFlow updatedFlow = registryService.updateFlow(versionedFlow); + assertNotNull(updatedFlow); + assertEquals(versionedFlow.getIdentifier(), updatedFlow.getIdentifier()); + + // name and description should be updated + assertEquals(versionedFlow.getName(), updatedFlow.getName()); + assertEquals(versionedFlow.getDescription(), updatedFlow.getDescription()); + + // other fields should not be updated + assertEquals(flowToUpdate.getBucketIdentifier(), updatedFlow.getBucketIdentifier()); + assertEquals(flowToUpdate.getCreatedTimestamp(), updatedFlow.getCreatedTimestamp()); + + // modified timestamp should be auto updated + assertTrue(updatedFlow.getModifiedTimestamp() > flowToUpdate.getModifiedTimestamp()); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDeleteFlowDoesNotExist() { + when(metadataProvider.getFlowById(any(String.class))).thenReturn(null); + registryService.deleteFlow("flow1"); + } + + @Test + public void testDeleteFlowWithSnapshots() { + final FlowMetadata flowToDelete = new StandardFlowMetadata.Builder() + .identifier("flow1") + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier("b1") + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(flowToDelete.getIdentifier())).thenReturn(flowToDelete); + when(metadataProvider.getFlowByName(flowToDelete.getName())).thenReturn(flowToDelete); + + final VersionedFlow deletedFlow = registryService.deleteFlow(flowToDelete.getIdentifier()); + assertNotNull(deletedFlow); + assertEquals(flowToDelete.getIdentifier(), deletedFlow.getIdentifier()); + + verify(flowPersistenceProvider, times(1)) + .deleteSnapshots(flowToDelete.getBucketIdentifier(), flowToDelete.getIdentifier()); + + verify(metadataProvider, times(1)) + .deleteFlow(flowToDelete.getIdentifier()); + } + + // ---------------------- Test VersionedFlowSnapshot methods --------------------------------------------- + + private VersionedFlowSnapshot createSnapshot() { + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setFlowIdentifier("flow1"); + snapshotMetadata.setFlowName("First Flow"); + snapshotMetadata.setVersion(1); + snapshotMetadata.setComments("This is the first snapshot"); + snapshotMetadata.setBucketIdentifier("b1"); + + final VersionedProcessGroup processGroup = new VersionedProcessGroup(); + processGroup.setIdentifier("pg1"); + processGroup.setName("My Process Group"); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(snapshotMetadata); + snapshot.setFlowContents(processGroup); + + return snapshot; + } + + @Test(expected = ConstraintViolationException.class) + public void testCreateSnapshotInvalidMetadata() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + snapshot.getSnapshotMetadata().setFlowName(null); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ConstraintViolationException.class) + public void testCreateSnapshotInvalidFlowContents() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + snapshot.setFlowContents(null); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ConstraintViolationException.class) + public void testCreateSnapshotNullMetadata() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + snapshot.setSnapshotMetadata(null); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ConstraintViolationException.class) + public void testCreateSnapshotNullFlowContents() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + snapshot.setFlowContents(null); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ResourceNotFoundException.class) + public void testCreateSnapshotBucketDoesNotExist() { + when(metadataProvider.getBucketById(any(String.class))).thenReturn(null); + + final VersionedFlowSnapshot snapshot = createSnapshot(); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ResourceNotFoundException.class) + public void testCreateSnapshotFlowDoesNotExist() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + when(metadataProvider.getFlowById(snapshot.getSnapshotMetadata().getFlowIdentifier())).thenReturn(null); + + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = IllegalStateException.class) + public void testCreateSnapshotVersionAlreadyExists() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + // make a snapshot that has the same version as the one being created + final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder() + .flowIdentifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .flowName(snapshot.getSnapshotMetadata().getFlowName()) + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .version(snapshot.getSnapshotMetadata().getVersion()) + .comments("This is an existing snapshot") + .created(System.currentTimeMillis()) + .build(); + + // return a flow with the existing snapshot when getFlowById is called + final FlowMetadata existingFlow = new StandardFlowMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .addSnapshot(existingSnapshot) + .build(); + + when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow); + + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = IllegalStateException.class) + public void testCreateSnapshotVersionNotNextVersion() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + // make a snapshot for version 1 + final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder() + .flowIdentifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .flowName(snapshot.getSnapshotMetadata().getFlowName()) + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .version(1) + .comments("This is an existing snapshot") + .created(System.currentTimeMillis()) + .build(); + + // return a flow with the existing snapshot when getFlowById is called + final FlowMetadata existingFlow = new StandardFlowMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .addSnapshot(existingSnapshot) + .build(); + + when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow); + + // set the version to something that is not the next one-up version + snapshot.getSnapshotMetadata().setVersion(100); + registryService.createFlowSnapshot(snapshot); + } + + @Test + public void testCreateFirstSnapshot() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + // return a flow with the existing snapshot when getFlowById is called + final FlowMetadata existingFlow = new StandardFlowMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow); + + final VersionedFlowSnapshot createdSnapshot = registryService.createFlowSnapshot(snapshot); + assertNotNull(createdSnapshot); + + verify(snapshotSerializer, times(1)).serialize(eq(snapshot), any(OutputStream.class)); + verify(flowPersistenceProvider, times(1)).saveSnapshot(any(), any()); + verify(metadataProvider, times(1)).createFlowSnapshot(any(FlowSnapshotMetadata.class)); + } + + @Test(expected = IllegalStateException.class) + public void testCreateFirstSnapshotWithBadVersion() { + final VersionedFlowSnapshot snapshot = createSnapshot(); + + final BucketMetadata existingBucket = new StandardBucketMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .name("My Bucket #1") + .description("This is my bucket.") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket); + + // return a flow with the existing snapshot when getFlowById is called + final FlowMetadata existingFlow = new StandardFlowMetadata.Builder() + .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier()) + .name("My Flow 1") + .description("This is my flow.") + .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier()) + .created(System.currentTimeMillis()) + .modified(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow); + + // set the first version to something other than 1 + snapshot.getSnapshotMetadata().setVersion(100); + registryService.createFlowSnapshot(snapshot); + } + + @Test(expected = ResourceNotFoundException.class) + public void testGetSnapshotDoesNotExistInMetadataProvider() { + final String flowId = "flow1"; + final Integer version = 1; + when(metadataProvider.getFlowSnapshot(flowId, version)).thenReturn(null); + registryService.getFlowSnapshot(flowId, version); + } + + @Test(expected = IllegalStateException.class) + public void testGetSnapshotDoesNotExistInPersistenceProvider() { + final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder() + .bucketIdentifier("b1") + .flowIdentifier("flow1") + .flowName("Flow 1") + .version(1) + .comments("This is snapshot 1") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion())) + .thenReturn(existingSnapshot); + + when(flowPersistenceProvider.getSnapshot( + existingSnapshot.getBucketIdentifier(), + existingSnapshot.getFlowIdentifier(), + existingSnapshot.getVersion() + )).thenReturn(null); + + registryService.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()); + } + + @Test + public void testGetSnapshotExists() { + final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder() + .bucketIdentifier("b1") + .flowIdentifier("flow1") + .flowName("Flow 1") + .version(1) + .comments("This is snapshot 1") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion())) + .thenReturn(existingSnapshot); + + // return a non-null, non-zero-length array so something gets passed to the serializer + when(flowPersistenceProvider.getSnapshot( + existingSnapshot.getBucketIdentifier(), + existingSnapshot.getFlowIdentifier(), + existingSnapshot.getVersion() + )).thenReturn(new byte[10]); + + final VersionedFlowSnapshot snapshotToDeserialize = createSnapshot(); + when(snapshotSerializer.deserialize(any(InputStream.class))).thenReturn(snapshotToDeserialize); + + final VersionedFlowSnapshot returnedSnapshot = registryService.getFlowSnapshot( + existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()); + assertNotNull(returnedSnapshot); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDeleteSnapshotDoesNotExist() { + final String flowId = "flow1"; + final Integer version = 1; + when(metadataProvider.getFlowSnapshot(flowId, version)).thenReturn(null); + registryService.deleteFlowSnapshot(flowId, version); + } + + @Test + public void testDeleteSnapshotExists() { + final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder() + .bucketIdentifier("b1") + .flowIdentifier("flow1") + .flowName("Flow 1") + .version(1) + .comments("This is snapshot 1") + .created(System.currentTimeMillis()) + .build(); + + when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion())) + .thenReturn(existingSnapshot); + + final VersionedFlowSnapshotMetadata deletedSnapshot = registryService.deleteFlowSnapshot( + existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()); + assertNotNull(deletedSnapshot); + assertEquals(existingSnapshot.getFlowIdentifier(), deletedSnapshot.getFlowIdentifier()); + + verify(flowPersistenceProvider, times(1)).deleteSnapshot( + existingSnapshot.getBucketIdentifier(), + existingSnapshot.getFlowIdentifier(), + existingSnapshot.getVersion() + ); + + verify(metadataProvider, times(1)).deleteFlowSnapshot( + existingSnapshot.getFlowIdentifier(), + existingSnapshot.getVersion() + ); + } + + // ------------------------------------------------------------------- + + private Answer<BucketMetadata> createBucketAnswer() { + return (InvocationOnMock invocation) -> { + BucketMetadata bucketMetadata = (BucketMetadata) invocation.getArguments()[0]; + return bucketMetadata; + }; + } + + private Answer<BucketMetadata> updateBucketAnswer() { + return (InvocationOnMock invocation) -> { + BucketMetadata bucketMetadata = (BucketMetadata) invocation.getArguments()[0]; + return bucketMetadata; + }; + } + + private Answer<FlowMetadata> createFlowAnswer() { + return (InvocationOnMock invocation) -> { + final FlowMetadata flowMetadata = (FlowMetadata) invocation.getArguments()[1]; + return flowMetadata; + }; + } + + private Answer<FlowMetadata> updateFlowAnswer() { + return (InvocationOnMock invocation) -> { + final FlowMetadata flowMetadata = (FlowMetadata) invocation.getArguments()[0]; + return flowMetadata; + }; + } +}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java index 727aae0..3ddcadc 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java @@ -42,9 +42,16 @@ public interface MetadataProvider extends Provider { * @param bucketIdentifier the id of the bucket to retrieve * @return the bucket with the given id, or null if it does not exist */ - BucketMetadata getBucket(String bucketIdentifier); + BucketMetadata getBucketById(String bucketIdentifier); /** + * Retrieves the bucket with the given name. The name comparison must be case-insensitive. + * + * @param name the name of the bucket to retrieve + * @return the bucket with the given name, or null if it does not exist + */ + BucketMetadata getBucketByName(String name); + /** * Updates the given bucket, only the name and description should be allowed to be updated. * * @param bucket the updated bucket to save @@ -53,7 +60,7 @@ public interface MetadataProvider extends Provider { BucketMetadata updateBucket(BucketMetadata bucket); /** - * Deletes the bucket with the given identifier if one exists. + * Deletes the bucket with the given identifier, as well as any objects that reference the bucket. * * @param bucketIdentifier the id of the bucket to delete */ @@ -82,7 +89,15 @@ public interface MetadataProvider extends Provider { * @param flowIdentifier the identifier of the flow to retrieve * @return the versioned flow with the given id, or null if no flow with the given id exists */ - FlowMetadata getFlow(String flowIdentifier); + FlowMetadata getFlowById(String flowIdentifier); + + /** + * Retrieves the versioned flow with the given name. The name comparison must be case-insensitive. + * + * @param name the name of the flow to retrieve + * @return the versioned flow with the given name, or null if no flow with the given name exists + */ + FlowMetadata getFlowByName(String name); /** * Updates the given versioned flow, only the name and description should be allowed to be updated. http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java index 05ddd01..dd3e1ec 100644 --- a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java +++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java @@ -62,7 +62,7 @@ public class FileSystemMetadataProvider implements MetadataProvider { try { return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileSystemMetadataProvider.class.getClassLoader()); } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext."); + throw new RuntimeException("Unable to create JAXBContext.", e); } } @@ -156,23 +156,33 @@ public class FileSystemMetadataProvider implements MetadataProvider { metadata.getBuckets().getBucket().add(jaxbBucket); saveAndRefresh(metadata); - return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier()); + return metadataHolder.get().getBucketsById().get(bucket.getIdentifier()); } @Override - public BucketMetadata getBucket(final String bucketIdentifier) { + public BucketMetadata getBucketById(final String bucketIdentifier) { if (bucketIdentifier == null) { throw new IllegalArgumentException("Bucket Identifier cannot be null"); } final MetadataHolder holder = metadataHolder.get(); - return holder.getBucketsBydId().get(bucketIdentifier); + return holder.getBucketsById().get(bucketIdentifier); + } + + @Override + public BucketMetadata getBucketByName(String name) { + if (name == null) { + throw new IllegalArgumentException("Bucket Name cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + return holder.getBucketsByName().get(name.toLowerCase()); } @Override public Set<BucketMetadata> getBuckets() { final MetadataHolder holder = metadataHolder.get(); - final Map<String,BucketMetadata> bucketsBydId = holder.getBucketsBydId(); + final Map<String,BucketMetadata> bucketsBydId = holder.getBucketsById(); return new HashSet<>(bucketsBydId.values()); } @@ -198,7 +208,7 @@ public class FileSystemMetadataProvider implements MetadataProvider { jaxbBucket.setDescription(bucket.getDescription()); saveAndRefresh(holder.getMetadata()); - return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier()); + return metadataHolder.get().getBucketsById().get(bucket.getIdentifier()); } @Override @@ -251,7 +261,7 @@ public class FileSystemMetadataProvider implements MetadataProvider { final MetadataHolder holder = metadataHolder.get(); - final BucketMetadata bucket = holder.getBucketsBydId().get(bucketIdentifier); + final BucketMetadata bucket = holder.getBucketsById().get(bucketIdentifier); if (bucket == null) { throw new IllegalStateException("Unable to create Versioned Flow because Bucket does not exist with id " + bucketIdentifier); } @@ -272,7 +282,7 @@ public class FileSystemMetadataProvider implements MetadataProvider { } @Override - public FlowMetadata getFlow(final String flowIdentifier) { + public FlowMetadata getFlowById(final String flowIdentifier) { if (flowIdentifier == null) { throw new IllegalArgumentException("Flow Identifier cannot be null"); } @@ -282,6 +292,17 @@ public class FileSystemMetadataProvider implements MetadataProvider { } @Override + public FlowMetadata getFlowByName(final String name) { + if (name == null) { + throw new IllegalArgumentException("Flow Name cannot be null"); + } + + final MetadataHolder holder = metadataHolder.get(); + return holder.getFlowsByName().get(name.toLowerCase()); + } + + + @Override public Set<FlowMetadata> getFlows() { final MetadataHolder holder = metadataHolder.get(); final Map<String,FlowMetadata> flowsById = holder.getFlowsById(); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java index 4264b01..e49a62b 100644 --- a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java +++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java @@ -36,13 +36,17 @@ public class MetadataHolder { private final Metadata metadata; private final Map<String,Set<FlowMetadata>> flowsByBucket; private final Map<String,FlowMetadata> flowsById; + private final Map<String,FlowMetadata> flowsByName; private final Map<String,BucketMetadata> bucketsById; + private final Map<String,BucketMetadata> bucketsByName; public MetadataHolder(final Metadata metadata) { this.metadata = metadata; this.flowsByBucket = Collections.unmodifiableMap(createFlowsByBucket(metadata)); + this.flowsByName = Collections.unmodifiableMap(createFlowsByName(flowsByBucket)); this.flowsById = Collections.unmodifiableMap(createFlowsById(flowsByBucket)); this.bucketsById = Collections.unmodifiableMap(createBucketsBydId(metadata, flowsByBucket)); + this.bucketsByName = Collections.unmodifiableMap(createBucketsByName(bucketsById)); } private Map<String,BucketMetadata> createBucketsBydId(final Metadata metadata, final Map<String,Set<FlowMetadata>> flowsByBucket) { @@ -60,6 +64,12 @@ public class MetadataHolder { return bucketsById; } + private Map<String,BucketMetadata> createBucketsByName(Map<String,BucketMetadata> bucketsById) { + final Map<String,BucketMetadata> bucketsByName = new HashMap<>(); + bucketsById.values().stream().forEach(b -> bucketsByName.put(b.getName().toLowerCase(), b)); + return bucketsByName; + } + private BucketMetadata createBucketMetadata(final Bucket jaxbBucket, final Set<FlowMetadata> bucketFlows) { return new StandardBucketMetadata.Builder() .identifier(jaxbBucket.getIdentifier()) @@ -116,29 +126,49 @@ public class MetadataHolder { } private Map<String,FlowMetadata> createFlowsById(final Map<String,Set<FlowMetadata>> flowsByBucket) { - final Map<String,FlowMetadata> flowsBdId = new HashMap<>(); + final Map<String,FlowMetadata> flowsById = new HashMap<>(); for (final Map.Entry<String,Set<FlowMetadata>> entry : flowsByBucket.entrySet()) { for (final FlowMetadata flowMetadata : entry.getValue()) { - flowsBdId.put(flowMetadata.getIdentifier(), flowMetadata); + flowsById.put(flowMetadata.getIdentifier(), flowMetadata); + } + } + + return flowsById; + } + + private Map<String,FlowMetadata> createFlowsByName(final Map<String,Set<FlowMetadata>> flowsByBucket) { + final Map<String,FlowMetadata> flowsByName = new HashMap<>(); + + for (final Map.Entry<String,Set<FlowMetadata>> entry : flowsByBucket.entrySet()) { + for (final FlowMetadata flow : entry.getValue()) { + flowsByName.put(flow.getName().toLowerCase(), flow); } } - return flowsBdId; + return flowsByName; } public Metadata getMetadata() { return metadata; } - public Map<String,BucketMetadata> getBucketsBydId() { + public Map<String,BucketMetadata> getBucketsById() { return bucketsById; } + public Map<String,BucketMetadata> getBucketsByName() { + return bucketsByName; + } + public Map<String,FlowMetadata> getFlowsById() { return flowsById; } + public Map<String,FlowMetadata> getFlowsByName() { + return flowsByName; + } + public Map<String,Set<FlowMetadata>> getFlowsByBucket() { return flowsByBucket; } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java index 753ca6e..5c823b6 100644 --- a/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java +++ b/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java @@ -111,15 +111,15 @@ public class TestFileSystemMetadataProvider { assertEquals(2, metadataProvider.getBuckets().size()); assertEquals(1, metadataProvider.getFlows().size()); - final BucketMetadata bucket1 = metadataProvider.getBucket("bucket1"); + final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket1"); assertNotNull(bucket1); assertEquals("bucket1", bucket1.getIdentifier()); - final BucketMetadata bucket2 = metadataProvider.getBucket("bucket2"); + final BucketMetadata bucket2 = metadataProvider.getBucketById("bucket2"); assertNotNull(bucket2); assertEquals("bucket2", bucket2.getIdentifier()); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); assertEquals("flow1", flowMetadata.getIdentifier()); } @@ -150,11 +150,11 @@ public class TestFileSystemMetadataProvider { } @Test - public void testGetBucketExists() { + public void testGetBucketByIdExists() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); assertEquals(2, metadataProvider.getBuckets().size()); - final BucketMetadata bucket1 = metadataProvider.getBucket("bucket1"); + final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket1"); assertNotNull(bucket1); assertEquals("bucket1", bucket1.getIdentifier()); assertEquals("Bryan's Bucket", bucket1.getName()); @@ -163,11 +163,46 @@ public class TestFileSystemMetadataProvider { } @Test - public void testGetBucketDoesNotExist() { + public void testGetBucketByIdDoesNotExist() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); assertEquals(2, metadataProvider.getBuckets().size()); - final BucketMetadata bucket1 = metadataProvider.getBucket("bucket-does-not-exist"); + final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket-does-not-exist"); + assertNull(bucket1); + } + + @Test + public void testGetBucketByNameExists() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + assertEquals(2, metadataProvider.getBuckets().size()); + + final BucketMetadata bucket1 = metadataProvider.getBucketByName("Bryan's Bucket"); + assertNotNull(bucket1); + assertEquals("bucket1", bucket1.getIdentifier()); + assertEquals("Bryan's Bucket", bucket1.getName()); + assertEquals("The description for Bryan's Bucket.", bucket1.getDescription()); + assertEquals(111111111, bucket1.getCreatedTimestamp()); + } + + @Test + public void testGetBucketByNameCaseInsensitive() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + assertEquals(2, metadataProvider.getBuckets().size()); + + final BucketMetadata bucket1 = metadataProvider.getBucketByName("bryan's bucket"); + assertNotNull(bucket1); + assertEquals("bucket1", bucket1.getIdentifier()); + assertEquals("Bryan's Bucket", bucket1.getName()); + assertEquals("The description for Bryan's Bucket.", bucket1.getDescription()); + assertEquals(111111111, bucket1.getCreatedTimestamp()); + } + + @Test + public void testGetBucketByNameDoesNotExist() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + assertEquals(2, metadataProvider.getBuckets().size()); + + final BucketMetadata bucket1 = metadataProvider.getBucketByName("bucket-does-not-exist"); assertNull(bucket1); } @@ -176,7 +211,7 @@ public class TestFileSystemMetadataProvider { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); assertEquals(2, metadataProvider.getBuckets().size()); - final BucketMetadata bucket = metadataProvider.getBucket("bucket1"); + final BucketMetadata bucket = metadataProvider.getBucketById("bucket1"); assertNotNull(bucket); final BucketMetadata updatedBucket = new StandardBucketMetadata.Builder(bucket) @@ -212,14 +247,14 @@ public class TestFileSystemMetadataProvider { assertEquals(2, metadataProvider.getBuckets().size()); final String bucketId = "bucket1"; - assertNotNull(metadataProvider.getBucket(bucketId)); + assertNotNull(metadataProvider.getBucketById(bucketId)); final Set<FlowMetadata> bucketFlows = metadataProvider.getFlows(bucketId); assertNotNull(bucketFlows); assertEquals(1, bucketFlows.size()); metadataProvider.deleteBucket(bucketId); - assertNull(metadataProvider.getBucket(bucketId)); + assertNull(metadataProvider.getBucketById(bucketId)); final Set<FlowMetadata> bucketFlows2 = metadataProvider.getFlows(bucketId); assertNotNull(bucketFlows2); @@ -243,7 +278,7 @@ public class TestFileSystemMetadataProvider { assertEquals(2, metadataProvider.getBuckets().size()); // verify bucket2 exists and has no flows - final BucketMetadata bucket2 = metadataProvider.getBucket("bucket2"); + final BucketMetadata bucket2 = metadataProvider.getBucketById("bucket2"); assertNotNull(bucket2); assertEquals(0, metadataProvider.getFlows(bucket2.getIdentifier()).size()); @@ -268,10 +303,10 @@ public class TestFileSystemMetadataProvider { } @Test - public void testGetFlowExists() { + public void testGetFlowByIdExists() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); assertEquals("flow1", flowMetadata.getIdentifier()); assertEquals("Bryan's Flow", flowMetadata.getName()); @@ -283,10 +318,46 @@ public class TestFileSystemMetadataProvider { } @Test - public void testGetFlowDoesNotExist() { + public void testGetFlowByIdDoesNotExist() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow-does-not-exist"); + assertNull(flowMetadata); + } + + @Test + public void testGetFlowByNameExists() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + + final FlowMetadata flowMetadata = metadataProvider.getFlowByName("Bryan's Flow"); + assertNotNull(flowMetadata); + assertEquals("flow1", flowMetadata.getIdentifier()); + assertEquals("Bryan's Flow", flowMetadata.getName()); + assertEquals("The description for Bryan's Flow.", flowMetadata.getDescription()); + assertEquals(333333333, flowMetadata.getCreatedTimestamp()); + assertEquals(444444444, flowMetadata.getModifiedTimestamp()); + assertEquals(3, flowMetadata.getSnapshotMetadata().size()); + } + + @Test + public void testGetFlowByNameCaseInsensitive() { + metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); + + final FlowMetadata flowMetadata = metadataProvider.getFlowByName("bryan's flow"); + assertNotNull(flowMetadata); + assertEquals("flow1", flowMetadata.getIdentifier()); + assertEquals("Bryan's Flow", flowMetadata.getName()); + assertEquals("The description for Bryan's Flow.", flowMetadata.getDescription()); + assertEquals(333333333, flowMetadata.getCreatedTimestamp()); + assertEquals(444444444, flowMetadata.getModifiedTimestamp()); + assertEquals(3, flowMetadata.getSnapshotMetadata().size()); + } + + @Test + public void testGetFlowByNameDoesNotExist() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow-does-not-exist"); + final FlowMetadata flowMetadata = metadataProvider.getFlowByName("flow-does-not-exist"); assertNull(flowMetadata); } @@ -294,7 +365,7 @@ public class TestFileSystemMetadataProvider { public void testUpdateFlowExists() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); final String newFlowName = "New Flow Name"; @@ -334,14 +405,14 @@ public class TestFileSystemMetadataProvider { public void testDeleteFlowWithSnapshots() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); assertNotNull(flowMetadata.getSnapshotMetadata()); assertTrue(flowMetadata.getSnapshotMetadata().size() > 0); metadataProvider.deleteFlow(flowMetadata.getIdentifier()); - final FlowMetadata deletedFlow = metadataProvider.getFlow("flow1"); + final FlowMetadata deletedFlow = metadataProvider.getFlowById("flow1"); assertNull(deletedFlow); } @@ -358,7 +429,7 @@ public class TestFileSystemMetadataProvider { public void testCreateFlowSnapshot() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata versionedFlow = metadataProvider.getFlow("flow1"); + final FlowMetadata versionedFlow = metadataProvider.getFlowById("flow1"); assertNotNull(versionedFlow); assertNotNull(versionedFlow.getSnapshotMetadata()); @@ -384,7 +455,7 @@ public class TestFileSystemMetadataProvider { assertEquals(nextSnapshot.getComments(), createdSnapshot.getComments()); assertEquals(nextSnapshot.getCreatedTimestamp(), createdSnapshot.getCreatedTimestamp()); - final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1"); + final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1"); assertNotNull(updatedFlow); assertNotNull(updatedFlow.getSnapshotMetadata()); assertEquals(updatedFlow.getSnapshotMetadata().size(), versionedFlow.getSnapshotMetadata().size() + 1); @@ -439,7 +510,7 @@ public class TestFileSystemMetadataProvider { public void testDeleteSnapshotExists() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); assertNotNull(flowMetadata.getSnapshotMetadata()); assertEquals(3, flowMetadata.getSnapshotMetadata().size()); @@ -449,7 +520,7 @@ public class TestFileSystemMetadataProvider { metadataProvider.deleteFlowSnapshot(flowMetadata.getIdentifier(), firstSnapshot.getVersion()); - final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1"); + final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1"); assertNotNull(updatedFlow); assertNotNull(updatedFlow.getSnapshotMetadata()); assertEquals(2, updatedFlow.getSnapshotMetadata().size()); @@ -463,14 +534,14 @@ public class TestFileSystemMetadataProvider { public void testDeleteSnapshotDoesNotExist() { metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING)); - final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1"); + final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1"); assertNotNull(flowMetadata); assertNotNull(flowMetadata.getSnapshotMetadata()); assertEquals(3, flowMetadata.getSnapshotMetadata().size()); metadataProvider.deleteFlowSnapshot(flowMetadata.getIdentifier(), Integer.MAX_VALUE); - final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1"); + final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1"); assertNotNull(updatedFlow); assertNotNull(updatedFlow.getSnapshotMetadata()); assertEquals(3, updatedFlow.getSnapshotMetadata().size()); http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-registry-web-api/pom.xml b/nifi-registry-web-api/pom.xml index b1a7f07..74c6ba3 100644 --- a/nifi-registry-web-api/pom.xml +++ b/nifi-registry-web-api/pom.xml @@ -85,11 +85,6 @@ <dependencies> <dependency> <groupId>org.apache.nifi.registry</groupId> - <artifactId>nifi-registry-data-model</artifactId> - <version>0.0.1-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.nifi.registry</groupId> <artifactId>nifi-registry-properties</artifactId> <version>0.0.1-SNAPSHOT</version> <scope>provided</scope> @@ -126,9 +121,12 @@ <artifactId>jersey-media-json-jackson</artifactId> </dependency> <dependency> + <groupId>org.glassfish.jersey.ext</groupId> + <artifactId>jersey-bean-validation</artifactId> + </dependency> + <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-annotations</artifactId> - <version>1.5.16</version> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java ---------------------------------------------------------------------- diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java index e2fabdc..02ece9d 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java @@ -17,19 +17,32 @@ package org.apache.nifi.registry.web; import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.metadata.MetadataProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.provider.ProviderFactory; import org.apache.nifi.registry.provider.StandardProviderFactory; +import org.apache.nifi.registry.serialization.FlowSnapshotSerializer; +import org.apache.nifi.registry.serialization.Serializer; +import org.apache.nifi.registry.service.RegistryService; +import org.apache.nifi.registry.web.api.BucketFlowResource; +import org.apache.nifi.registry.web.api.BucketResource; +import org.apache.nifi.registry.web.api.FlowResource; import org.apache.nifi.registry.web.api.TestResource; import org.apache.nifi.registry.web.mapper.IllegalArgumentExceptionMapper; +import org.apache.nifi.registry.web.mapper.IllegalStateExceptionMapper; +import org.apache.nifi.registry.web.mapper.ResourceNotFoundExceptionMapper; import org.apache.nifi.registry.web.mapper.ThrowableMapper; import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.server.filter.HttpMethodOverrideFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.ServletContext; +import javax.validation.Validation; +import javax.validation.Validator; +import javax.validation.ValidatorFactory; import javax.ws.rs.core.Context; public class NiFiRegistryResourceConfig extends ResourceConfig { @@ -39,17 +52,37 @@ public class NiFiRegistryResourceConfig extends ResourceConfig { public NiFiRegistryResourceConfig(@Context ServletContext servletContext) { final NiFiRegistryProperties properties = (NiFiRegistryProperties) servletContext.getAttribute("nifi-registry.properties"); + // create the providers final ProviderFactory providerFactory = new StandardProviderFactory(properties); + providerFactory.initialize(); + final MetadataProvider metadataProvider = providerFactory.getMetadataProvider(); final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider(); + // create a serializer for flow snapshots + final Serializer<VersionedFlowSnapshot> snapshotSerializer = new FlowSnapshotSerializer(); + + // create a validator + final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); + final Validator validator = validatorFactory.getValidator(); + + // create the main services that the REST resources will use + final RegistryService registryService = new RegistryService(metadataProvider, flowPersistenceProvider, snapshotSerializer, validator); + register(HttpMethodOverrideFilter.class); // register the exception mappers register(new IllegalArgumentExceptionMapper()); + register(new IllegalStateExceptionMapper()); + register(new ResourceNotFoundExceptionMapper()); register(new ThrowableMapper()); // register endpoints register(new TestResource(metadataProvider, flowPersistenceProvider)); + register(new BucketResource(registryService)); + register(new BucketFlowResource(registryService)); + register(new FlowResource(registryService)); + + property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true); } } http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java index fc838fd..218205b 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java @@ -19,6 +19,7 @@ package org.apache.nifi.registry.web.api; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.service.RegistryService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,12 @@ public class BucketFlowResource { private static final Logger logger = LoggerFactory.getLogger(BucketFlowResource.class); + private final RegistryService registryService; + + public BucketFlowResource(final RegistryService registryService) { + this.registryService = registryService; + } + @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @@ -47,10 +54,9 @@ public class BucketFlowResource { "The flow id is created by the server and a location URI for the created flow resource is returned.", response = VersionedFlow.class ) - public Response createFlow(@PathParam("bucketId") String bucketId) { - // TODO implement createFlow - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + public Response createFlow(@PathParam("bucketId") final String bucketId, final VersionedFlow flow) { + final VersionedFlow createdFlow = registryService.createFlow(bucketId, flow); + return Response.status(Response.Status.OK).entity(createdFlow).build(); } /* TODO, add redirection URIs so that GET, PUT, DELETE operations for a given flow id (once created) http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java ---------------------------------------------------------------------- diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java index 25f62f5..797ea9e 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java @@ -20,7 +20,9 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.service.RegistryService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; +import java.util.Set; @Path("/buckets") @Api( @@ -50,6 +53,12 @@ public class BucketResource { @Context UriInfo uriInfo; + private final RegistryService registryService; + + public BucketResource(final RegistryService registryService) { + this.registryService = registryService; + } + @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @@ -57,10 +66,9 @@ public class BucketResource { value = "Create a named bucket capable of storing NiFi bucket objects such as flows and extension bundles.", response = Bucket.class ) - public Response createBucket(Bucket bucket) { - // TODO implement createBucket - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + public Response createBucket(final Bucket bucket) { + final Bucket createdBucket = registryService.createBucket(bucket); + return Response.status(Response.Status.OK).entity(createdBucket).build(); } @GET @@ -72,9 +80,8 @@ public class BucketResource { responseContainer = "List" ) public Response getBuckets() { - // TODO implement getBuckets - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + final Set<Bucket> buckets = registryService.getBuckets(); + return Response.status(Response.Status.OK).entity(buckets).build(); } @GET @@ -90,10 +97,9 @@ public class BucketResource { @ApiResponse(code = 404, message = "The specified resource could not be found."), } ) - public Response getBucket(@PathParam("bucketId") String bucketId) { - // TODO implement getBucket - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + public Response getBucket(@PathParam("bucketId") final String bucketId) { + final Bucket bucket = registryService.getBucket(bucketId); + return Response.status(Response.Status.OK).entity(bucket).build(); } @PUT @@ -109,10 +115,25 @@ public class BucketResource { @ApiResponse(code = 404, message = "The specified resource could not be found."), } ) - public Response updateBucket(@PathParam("bucketId") String bucketId) { - // TODO implement updateBucket - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + public Response updateBucket(@PathParam("bucketId") final String bucketId, final Bucket bucket) { + if (StringUtils.isBlank(bucketId)) { + throw new IllegalArgumentException("Bucket Id cannot be blank"); + } + + if (bucket == null) { + throw new IllegalArgumentException("Bucket cannot be null"); + } + + if (bucket.getIdentifier() != null && !bucketId.equals(bucket.getIdentifier())) { + throw new IllegalArgumentException("Bucket id in path param must match bucket id in body"); + } + + if (bucket.getIdentifier() == null) { + bucket.setIdentifier(bucketId); + } + + final Bucket updatedBucket = registryService.updateBucket(bucket); + return Response.status(Response.Status.OK).entity(updatedBucket).build(); } @DELETE @@ -128,10 +149,9 @@ public class BucketResource { @ApiResponse(code = 404, message = "The specified resource could not be found."), } ) - public Response deleteBucket(@PathParam("bucketId") String bucketId) { - // TODO implement deleteBucket - logger.error("This API functionality has not yet been implemented."); - return Response.status(Response.Status.NOT_IMPLEMENTED).build(); + public Response deleteBucket(@PathParam("bucketId") final String bucketId) { + final Bucket deletedBucket = registryService.deleteBucket(bucketId); + return Response.status(Response.Status.OK).entity(deletedBucket).build(); } }
