Repository: nifi Updated Branches: refs/heads/master febe6da4a -> 897c70298
http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java new file mode 100644 index 0000000..6c21d91 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java @@ -0,0 +1,827 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import com.google.cloud.Page; +import com.google.cloud.storage.Acl; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.LogMessage; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ListGCSBucket} which do not consume Google Cloud resources. + */ +public class ListGCSBucketTest extends AbstractGCSTest { + private static final String PREFIX = "test-prefix"; + private static final Boolean USE_GENERATIONS = true; + + private static final Long SIZE = 100L; + private static final String CACHE_CONTROL = "test-cache-control"; + private static final Integer COMPONENT_COUNT = 3; + private static final String CONTENT_ENCODING = "test-content-encoding"; + private static final String CONTENT_LANGUAGE = "test-content-language"; + private static final String CONTENT_TYPE = "test-content-type"; + private static final String CRC32C = "test-crc32c"; + private static final String ENCRYPTION = "test-encryption"; + private static final String ENCRYPTION_SHA256 = "test-encryption-256"; + private static final String ETAG = "test-etag"; + private static final String GENERATED_ID = "test-generated-id"; + private static final String MD5 = "test-md5"; + private static final String MEDIA_LINK = "test-media-link"; + private static final Long METAGENERATION = 42L; + private static final String OWNER_USER_EMAIL = "test-owner-user-email"; + private static final String OWNER_GROUP_EMAIL = "test-owner-group-email"; + private static final String OWNER_DOMAIN = "test-owner-domain"; + private static final String OWNER_PROJECT_ID = "test-owner-project-id"; + private static final String URI = "test-uri"; + private static final String CONTENT_DISPOSITION = "attachment; filename=\"test-content-disposition.txt\""; + private static final Long CREATE_TIME = 1234L; + private static final Long UPDATE_TIME = 4567L; + private final static Long GENERATION = 5L; + + @Mock + Storage storage; + + @Captor + ArgumentCaptor<Storage.BlobListOption> argumentCaptor; + + @Override + public ListGCSBucket getProcessor() { + return new ListGCSBucket() { + @Override + protected Storage getCloudService() { + return storage; + } + }; + } + + @Override + protected void addRequiredPropertiesToRunner(TestRunner runner) { + runner.setProperty(ListGCSBucket.BUCKET, BUCKET); + } + + @Test + public void testRestoreFreshState() throws Exception { + reset(storage); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + assertEquals("Cluster StateMap should be fresh (version -1L)", + -1L, + runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion() + ); + + assertNull(processor.currentKeys); + + processor.restoreState(runner.getProcessContext()); + + assertNotNull(processor.currentKeys); + assertEquals( + 0L, + processor.currentTimestamp + ); + + assertTrue(processor.currentKeys.isEmpty()); + + } + + @Test + public void testRestorePreviousState() throws Exception { + reset(storage); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Map<String, String> state = ImmutableMap.of( + ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L), + ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0", + ListGCSBucket.CURRENT_KEY_PREFIX + "1", "test-key-1" + ); + + runner.getStateManager().setState(state, Scope.CLUSTER); + + assertNull(processor.currentKeys); + assertEquals( + 0L, + processor.currentTimestamp + ); + + processor.restoreState(runner.getProcessContext()); + + assertNotNull(processor.currentKeys); + assertTrue(processor.currentKeys.contains("test-key-0")); + assertTrue(processor.currentKeys.contains("test-key-1")); + assertEquals( + 4L, + processor.currentTimestamp + ); + } + + @Test + public void testPersistState() throws Exception { + reset(storage); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + assertEquals("Cluster StateMap should be fresh (version -1L)", + -1L, + runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion() + ); + + processor.currentKeys = ImmutableSet.of( + "test-key-0", + "test-key-1" + ); + + processor.currentTimestamp = 4L; + + processor.persistState(runner.getProcessContext()); + + final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals( + "Cluster StateMap should have been written to", + 1L, + stateMap.getVersion() + ); + + assertEquals( + ImmutableMap.of( + ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L), + ListGCSBucket.CURRENT_KEY_PREFIX+"0", "test-key-0", + ListGCSBucket.CURRENT_KEY_PREFIX+"1", "test-key-1" + ), + stateMap.toMap() + ); + } + + @Test + public void testFailedPersistState() throws Exception { + reset(storage); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true); + + processor.currentKeys = ImmutableSet.of( + "test-key-0", + "test-key-1" + ); + + processor.currentTimestamp = 4L; + + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + processor.persistState(runner.getProcessContext()); + + // The method should have caught the error and reported it to the logger. + final List<LogMessage> logMessages = runner.getLogger().getErrorMessages(); + assertFalse(logMessages.isEmpty()); + assertEquals( + 1, + logMessages.size() + ); + + // We could do more specific things like check the contents of the LogMessage, + // but that seems too nitpicky. + + } + + @Mock + Page<Blob> mockBlobPages; + + private Blob buildMockBlob(String bucket, String key, long updateTime) { + final Blob blob = mock(Blob.class); + when(blob.getBucket()).thenReturn(bucket); + when(blob.getName()).thenReturn(key); + when(blob.getUpdateTime()).thenReturn(updateTime); + return blob; + } + + @Test + public void testSuccessfulList() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of( + buildMockBlob("blob-bucket-1", "blob-key-1", 2L), + buildMockBlob("blob-bucket-2", "blob-key-2", 3L) + ); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS); + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2); + + final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS); + + MockFlowFile flowFile = successes.get(0); + assertEquals( + "blob-bucket-1", + flowFile.getAttribute(BUCKET_ATTR) + ); + + assertEquals( + "blob-key-1", + flowFile.getAttribute(KEY_ATTR) + ); + + assertEquals( + "2", + flowFile.getAttribute(UPDATE_TIME_ATTR) + ); + + flowFile = successes.get(1); + assertEquals( + "blob-bucket-2", + flowFile.getAttribute(BUCKET_ATTR) + ); + + assertEquals( + "blob-key-2", + flowFile.getAttribute(KEY_ATTR) + ); + + assertEquals( + "3", + flowFile.getAttribute(UPDATE_TIME_ATTR) + ); + + assertEquals( + 3L, + processor.currentTimestamp + ); + + assertEquals( + ImmutableSet.of( + "blob-key-2" + ), + processor.currentKeys + ); + + } + + @Test + public void testOldValues() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of( + buildMockBlob("blob-bucket-1", "blob-key-1", 2L) + ); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.enqueue("test2"); + runner.run(2); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS); + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1); + + assertEquals( + "blob-key-1", + runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_KEY_PREFIX+"0") + ); + + assertEquals( + "2", + runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP) + ); + + } + + + + @Test + public void testEmptyList() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of(); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0); + + assertEquals( + "No state should be persisted on an empty return", + -1L, + runner.getStateManager().getState(Scope.CLUSTER).getVersion() + ); + } + + @Test + public void testAttributesSet() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); + when(blob.getSize()).thenReturn(SIZE); + when(blob.getCacheControl()).thenReturn(CACHE_CONTROL); + when(blob.getComponentCount()).thenReturn(COMPONENT_COUNT); + when(blob.getContentEncoding()).thenReturn(CONTENT_ENCODING); + when(blob.getContentLanguage()).thenReturn(CONTENT_LANGUAGE); + when(blob.getContentType()).thenReturn(CONTENT_TYPE); + when(blob.getCrc32c()).thenReturn(CRC32C); + + final BlobInfo.CustomerEncryption mockEncryption = mock(BlobInfo.CustomerEncryption.class); + when(mockEncryption.getEncryptionAlgorithm()).thenReturn(ENCRYPTION); + when(mockEncryption.getKeySha256()).thenReturn(ENCRYPTION_SHA256); + when(blob.getCustomerEncryption()).thenReturn(mockEncryption); + + when(blob.getEtag()).thenReturn(ETAG); + when(blob.getGeneratedId()).thenReturn(GENERATED_ID); + when(blob.getGeneration()).thenReturn(GENERATION); + when(blob.getMd5()).thenReturn(MD5); + when(blob.getMediaLink()).thenReturn(MEDIA_LINK); + when(blob.getMetageneration()).thenReturn(METAGENERATION); + when(blob.getSelfLink()).thenReturn(URI); + when(blob.getContentDisposition()).thenReturn(CONTENT_DISPOSITION); + when(blob.getCreateTime()).thenReturn(CREATE_TIME); + when(blob.getUpdateTime()).thenReturn(UPDATE_TIME); + + final Iterable<Blob> mockList = ImmutableList.of(blob); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + + runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS); + runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0); + assertEquals( + CACHE_CONTROL, + flowFile.getAttribute(CACHE_CONTROL_ATTR) + ); + + assertEquals( + COMPONENT_COUNT, + Integer.valueOf(flowFile.getAttribute(COMPONENT_COUNT_ATTR)) + ); + + assertEquals( + CONTENT_ENCODING, + flowFile.getAttribute(CONTENT_ENCODING_ATTR) + ); + + assertEquals( + CONTENT_LANGUAGE, + flowFile.getAttribute(CONTENT_LANGUAGE_ATTR) + ); + + assertEquals( + CONTENT_TYPE, + flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()) + ); + + assertEquals( + CRC32C, + flowFile.getAttribute(CRC32C_ATTR) + ); + + assertEquals( + ENCRYPTION, + flowFile.getAttribute(ENCRYPTION_ALGORITHM_ATTR) + ); + + assertEquals( + ENCRYPTION_SHA256, + flowFile.getAttribute(ENCRYPTION_SHA256_ATTR) + ); + + assertEquals( + ETAG, + flowFile.getAttribute(ETAG_ATTR) + ); + + assertEquals( + GENERATED_ID, + flowFile.getAttribute(GENERATED_ID_ATTR) + ); + + assertEquals( + GENERATION, + Long.valueOf(flowFile.getAttribute(GENERATION_ATTR)) + ); + + assertEquals( + MD5, + flowFile.getAttribute(MD5_ATTR) + ); + + assertEquals( + MEDIA_LINK, + flowFile.getAttribute(MEDIA_LINK_ATTR) + ); + + assertEquals( + METAGENERATION, + Long.valueOf(flowFile.getAttribute(METAGENERATION_ATTR)) + ); + + assertEquals( + URI, + flowFile.getAttribute(URI_ATTR) + ); + + assertEquals( + CONTENT_DISPOSITION, + flowFile.getAttribute(CONTENT_DISPOSITION_ATTR) + ); + + assertEquals( + CREATE_TIME, + Long.valueOf(flowFile.getAttribute(CREATE_TIME_ATTR)) + ); + + assertEquals( + UPDATE_TIME, + Long.valueOf(flowFile.getAttribute(UPDATE_TIME_ATTR)) + ); + } + + @Test + public void testAclOwnerUser() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); + final Acl.User mockUser = mock(Acl.User.class); + when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL); + when(blob.getOwner()).thenReturn(mockUser); + + final Iterable<Blob> mockList = ImmutableList.of(blob); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + + runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS); + runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0); + assertEquals( + OWNER_USER_EMAIL, + flowFile.getAttribute(OWNER_ATTR) + ); + + assertEquals( + "user", + flowFile.getAttribute(OWNER_TYPE_ATTR) + ); + + } + + + @Test + public void testAclOwnerGroup() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); + final Acl.Group mockGroup = mock(Acl.Group.class); + when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL); + when(blob.getOwner()).thenReturn(mockGroup); + + final Iterable<Blob> mockList = ImmutableList.of(blob); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + + runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS); + runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0); + assertEquals( + OWNER_GROUP_EMAIL, + flowFile.getAttribute(OWNER_ATTR) + ); + + assertEquals( + "group", + flowFile.getAttribute(OWNER_TYPE_ATTR) + ); + + } + + + + @Test + public void testAclOwnerDomain() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); + final Acl.Domain mockDomain = mock(Acl.Domain.class); + when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN); + when(blob.getOwner()).thenReturn(mockDomain); + + final Iterable<Blob> mockList = ImmutableList.of(blob); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + + runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS); + runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0); + assertEquals( + OWNER_DOMAIN, + flowFile.getAttribute(OWNER_ATTR) + ); + + assertEquals( + "domain", + flowFile.getAttribute(OWNER_TYPE_ATTR) + ); + + } + + + + @Test + public void testAclOwnerProject() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L); + final Acl.Project mockProject = mock(Acl.Project.class); + when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID); + when(blob.getOwner()).thenReturn(mockProject); + + final Iterable<Blob> mockList = ImmutableList.of(blob); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + + runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS); + runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0); + assertEquals( + OWNER_PROJECT_ID, + flowFile.getAttribute(OWNER_ATTR) + ); + + assertEquals( + "project", + flowFile.getAttribute(OWNER_TYPE_ATTR) + ); + + } + + + @Test + public void testYieldOnBadStateRestore() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of(); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), any(Storage.BlobListOption[].class))) + .thenReturn(mockBlobPages); + + runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true); + runner.enqueue("test"); + runner.run(); + + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0); + assertEquals( + 1, + runner.getLogger().getErrorMessages().size() + ); + } + + @Test + public void testListOptionsPrefix() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + runner.setProperty( + ListGCSBucket.PREFIX, + PREFIX + ); + + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of(); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), argumentCaptor.capture())) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + assertEquals( + Storage.BlobListOption.prefix(PREFIX), + argumentCaptor.getValue() + ); + + } + + + @Test + public void testListOptionsVersions() throws Exception { + reset(storage, mockBlobPages); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + runner.setProperty( + ListGCSBucket.USE_GENERATIONS, + String.valueOf(USE_GENERATIONS) + ); + runner.assertValid(); + + final Iterable<Blob> mockList = ImmutableList.of(); + + when(mockBlobPages.getValues()) + .thenReturn(mockList); + + when(mockBlobPages.getNextPage()).thenReturn(null); + + when(storage.list(anyString(), argumentCaptor.capture())) + .thenReturn(mockBlobPages); + + runner.enqueue("test"); + runner.run(); + + Storage.BlobListOption option = argumentCaptor.getValue(); + + assertEquals( + Storage.BlobListOption.versions(true), + option + ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectIT.java new file mode 100644 index 0000000..9c14013 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import com.google.cloud.storage.Acl; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.Test; + +import java.util.Map; + +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link PutGCSObject} which actually use Google Cloud resources. + */ +public class PutGCSObjectIT extends AbstractGCSIT { + private static final String KEY = "delete-me"; + private static final byte[] CONTENT = {12, 13, 14}; + + @Test + public void testSimplePut() throws Exception { + final TestRunner runner = buildNewRunner(new PutGCSObject()); + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + runner.setProperty(PutGCSObject.KEY, KEY); + + runner.enqueue(CONTENT); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS, 1); + assertTrue(fileEquals(KEY, CONTENT)); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS).get(0); + flowFile.assertAttributeNotExists(ENCRYPTION_ALGORITHM_ATTR); + + for (Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { + System.out.println(entry.getKey() + ":" + entry.getValue()); + } + } + + @Test + public void testEncryptedPut() throws Exception { + final TestRunner runner = buildNewRunner(new PutGCSObject()); + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + runner.setProperty(PutGCSObject.KEY, KEY); + runner.setProperty(PutGCSObject.ENCRYPTION_KEY, ENCRYPTION_KEY); + + runner.enqueue(CONTENT); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS, 1); + assertTrue(fileEqualsEncrypted(KEY, CONTENT)); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS).get(0); + flowFile.assertAttributeExists(ENCRYPTION_ALGORITHM_ATTR); + + for (Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { + System.out.println(entry.getKey() + ":" + entry.getValue()); + } + } + + @Test + public void testPutWithAcl() throws Exception { + final TestRunner runner = buildNewRunner(new PutGCSObject()); + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + runner.setProperty(PutGCSObject.KEY, KEY); + runner.setProperty(PutGCSObject.ACL, PutGCSObject.ACL_BUCKET_OWNER_READ); + + runner.enqueue(CONTENT); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS, 1); + assertTrue(fileEquals(KEY, CONTENT)); + + final Blob blob = storage.get(BlobId.of(BUCKET, KEY)); + + boolean userIsOwner = false; + boolean projectOwnerIsReader = false; + for (Acl acl : blob.listAcls()) { + if (acl.getEntity().getType() == Acl.Entity.Type.USER + && acl.getRole() == Acl.Role.OWNER) { + userIsOwner = true; + } + + if (acl.getEntity().getType() == Acl.Entity.Type.PROJECT + && acl.getRole() == Acl.Role.READER) { + projectOwnerIsReader = true; + } + } + + assertTrue(userIsOwner); + assertTrue(projectOwnerIsReader); + } + + @Test + public void testPutWithOverwrite() throws Exception { + final TestRunner runner = buildNewRunner(new PutGCSObject()); + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + runner.setProperty(PutGCSObject.KEY, KEY); + + putTestFile(KEY, new byte[]{1, 2}); + + runner.enqueue(CONTENT); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS, 1); + assertTrue(fileEquals(KEY, CONTENT)); + + } + + + @Test + public void testPutWithNoOverwrite() throws Exception { + final TestRunner runner = buildNewRunner(new PutGCSObject()); + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + runner.setProperty(PutGCSObject.KEY, KEY); + runner.setProperty(PutGCSObject.OVERWRITE, "false"); + + putTestFile(KEY, new byte[]{1, 2}); + + runner.enqueue(CONTENT); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_FAILURE, 1); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java new file mode 100644 index 0000000..4b6fb9a --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import com.google.cloud.storage.Acl; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link PutGCSObject} which do not use Google Cloud resources. + */ +public class PutGCSObjectTest extends AbstractGCSTest { + private static final String FILENAME = "test-filename"; + private static final String KEY = "test-key"; + private static final String CONTENT_TYPE = "test-content-type"; + private static final String MD5 = "test-md5"; + private static final String CRC32C = "test-crc32c"; + private static final Storage.PredefinedAcl ACL = BUCKET_OWNER_READ; + private static final String ENCRYPTION_KEY = "test-encryption-key"; + private static final Boolean OVERWRITE = false; + private static final String CONTENT_DISPOSITION_TYPE = "inline"; + + + private static final Long SIZE = 100L; + private static final String CACHE_CONTROL = "test-cache-control"; + private static final Integer COMPONENT_COUNT = 3; + private static final String CONTENT_ENCODING = "test-content-encoding"; + private static final String CONTENT_LANGUAGE = "test-content-language"; + private static final String ENCRYPTION = "test-encryption"; + private static final String ENCRYPTION_SHA256 = "test-encryption-256"; + private static final String ETAG = "test-etag"; + private static final String GENERATED_ID = "test-generated-id"; + private static final String MEDIA_LINK = "test-media-link"; + private static final Long METAGENERATION = 42L; + private static final String OWNER_USER_EMAIL = "test-owner-user-email"; + private static final String OWNER_GROUP_EMAIL = "test-owner-group-email"; + private static final String OWNER_DOMAIN = "test-owner-domain"; + private static final String OWNER_PROJECT_ID = "test-owner-project-id"; + private static final String URI = "test-uri"; + private static final String CONTENT_DISPOSITION = "attachment; filename=\"" + FILENAME + "\""; + private static final Long CREATE_TIME = 1234L; + private static final Long UPDATE_TIME = 4567L; + private final static Long GENERATION = 5L; + + @Mock + Storage storage; + + @Mock + Blob blob; + + @Captor + ArgumentCaptor<Storage.BlobWriteOption> blobWriteOptionArgumentCaptor; + + @Captor + ArgumentCaptor<InputStream> inputStreamArgumentCaptor; + + @Captor + ArgumentCaptor<BlobInfo> blobInfoArgumentCaptor; + + @Override + public PutGCSObject getProcessor() { + return new PutGCSObject() { + @Override + protected Storage getCloudService() { + return storage; + } + }; + } + + @Override + protected void addRequiredPropertiesToRunner(TestRunner runner) { + runner.setProperty(PutGCSObject.BUCKET, BUCKET); + } + + @Test + public void testSuccessfulPutOperationNoParameters() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + runner.assertValid(); + + when(storage.create(blobInfoArgumentCaptor.capture(), + inputStreamArgumentCaptor.capture(), + blobWriteOptionArgumentCaptor.capture())).thenReturn(blob); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + /** Can't do this any more due to the switch to Java InputStreams which close after an operation **/ + /* + String text; + try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) { + text = CharStreams.toString(reader); + } + + assertEquals( + "FlowFile content should be equal to the Blob content", + "test", + text + ); + */ + + final List<Storage.BlobWriteOption> blobWriteOptions = blobWriteOptionArgumentCaptor.getAllValues(); + assertEquals("No BlobWriteOptions should be set", + 0, + blobWriteOptions.size()); + + final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue(); + assertNull(blobInfo.getMd5()); + assertNull(blobInfo.getContentDisposition()); + assertNull(blobInfo.getCrc32c()); + } + + @Test + public void testSuccessfulPutOperation() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + runner.setProperty(PutGCSObject.KEY, KEY); + runner.setProperty(PutGCSObject.CONTENT_TYPE, CONTENT_TYPE); + runner.setProperty(PutGCSObject.MD5, MD5); + runner.setProperty(PutGCSObject.CRC32C, CRC32C); + runner.setProperty(PutGCSObject.ACL, ACL.name()); + runner.setProperty(PutGCSObject.ENCRYPTION_KEY, ENCRYPTION_KEY); + runner.setProperty(PutGCSObject.OVERWRITE, String.valueOf(OVERWRITE)); + runner.setProperty(PutGCSObject.CONTENT_DISPOSITION_TYPE, CONTENT_DISPOSITION_TYPE); + + runner.assertValid(); + + when(storage.create(blobInfoArgumentCaptor.capture(), + inputStreamArgumentCaptor.capture(), + blobWriteOptionArgumentCaptor.capture())).thenReturn(blob); + + runner.enqueue("test", ImmutableMap.of(CoreAttributes.FILENAME.key(), FILENAME)); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + /* + + String text; + try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) { + text = CharStreams.toString(reader); + } + + assertEquals( + "FlowFile content should be equal to the Blob content", + "test", + text + ); + + */ + + final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue(); + assertEquals( + BUCKET, + blobInfo.getBucket() + ); + + assertEquals( + KEY, + blobInfo.getName() + ); + + assertEquals( + CONTENT_DISPOSITION_TYPE + "; filename=" + FILENAME, + blobInfo.getContentDisposition() + ); + + assertEquals( + CONTENT_TYPE, + blobInfo.getContentType() + ); + + assertEquals( + MD5, + blobInfo.getMd5() + ); + + assertEquals( + CRC32C, + blobInfo.getCrc32c() + ); + + assertNull(blobInfo.getMetadata()); + + final List<Storage.BlobWriteOption> blobWriteOptions = blobWriteOptionArgumentCaptor.getAllValues(); + final Set<Storage.BlobWriteOption> blobWriteOptionSet = ImmutableSet.copyOf(blobWriteOptions); + + assertEquals( + "Each of the BlobWriteOptions should be unique", + blobWriteOptions.size(), + blobWriteOptionSet.size() + ); + + assertTrue("The doesNotExist BlobWriteOption should be set if OVERWRITE is false", + blobWriteOptionSet.contains(Storage.BlobWriteOption.doesNotExist())); + assertTrue("The md5Match BlobWriteOption should be set if MD5 is non-null", + blobWriteOptionSet.contains(Storage.BlobWriteOption.md5Match())); + assertTrue("The crc32cMatch BlobWriteOption should be set if CRC32C is non-null", + blobWriteOptionSet.contains(Storage.BlobWriteOption.crc32cMatch())); + assertTrue("The predefinedAcl BlobWriteOption should be set if ACL is non-null", + blobWriteOptionSet.contains(Storage.BlobWriteOption.predefinedAcl(ACL))); + assertTrue("The encryptionKey BlobWriteOption should be set if ENCRYPTION_KEY is non-null", + blobWriteOptionSet.contains(Storage.BlobWriteOption.encryptionKey(ENCRYPTION_KEY))); + + } + + @Test + public void testSuccessfulPutOperationWithUserMetadata() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + + runner.setProperty( + "testMetadataKey1", "testMetadataValue1" + ); + runner.setProperty( + "testMetadataKey2", "testMetadataValue2" + ); + + runner.assertValid(); + + when(storage.create(blobInfoArgumentCaptor.capture(), + inputStreamArgumentCaptor.capture(), + blobWriteOptionArgumentCaptor.capture())).thenReturn(blob); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + + /* + String text; + try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) { + text = CharStreams.toString(reader); + } + + assertEquals( + "FlowFile content should be equal to the Blob content", + "test", + text + ); + + */ + + final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue(); + final Map<String, String> metadata = blobInfo.getMetadata(); + + assertNotNull(metadata); + + assertEquals( + 2, + metadata.size() + ); + + assertEquals( + "testMetadataValue1", + metadata.get("testMetadataKey1") + ); + + assertEquals( + "testMetadataValue2", + metadata.get("testMetadataKey2") + ); + } + + @Test + public void testAttributesSetOnSuccessfulPut() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenReturn(blob); + + when(blob.getBucket()).thenReturn(BUCKET); + when(blob.getName()).thenReturn(KEY); + when(blob.getSize()).thenReturn(SIZE); + when(blob.getCacheControl()).thenReturn(CACHE_CONTROL); + when(blob.getComponentCount()).thenReturn(COMPONENT_COUNT); + when(blob.getContentDisposition()).thenReturn(CONTENT_DISPOSITION); + when(blob.getContentEncoding()).thenReturn(CONTENT_ENCODING); + when(blob.getContentLanguage()).thenReturn(CONTENT_LANGUAGE); + when(blob.getContentType()).thenReturn(CONTENT_TYPE); + when(blob.getCrc32c()).thenReturn(CRC32C); + + final BlobInfo.CustomerEncryption mockEncryption = mock(BlobInfo.CustomerEncryption.class); + when(blob.getCustomerEncryption()).thenReturn(mockEncryption); + when(mockEncryption.getEncryptionAlgorithm()).thenReturn(ENCRYPTION); + when(mockEncryption.getKeySha256()).thenReturn(ENCRYPTION_SHA256); + when(blob.getEtag()).thenReturn(ETAG); + when(blob.getGeneratedId()).thenReturn(GENERATED_ID); + when(blob.getGeneration()).thenReturn(GENERATION); + when(blob.getMd5()).thenReturn(MD5); + when(blob.getMediaLink()).thenReturn(MEDIA_LINK); + when(blob.getMetageneration()).thenReturn(METAGENERATION); + when(blob.getSelfLink()).thenReturn(URI); + when(blob.getCreateTime()).thenReturn(CREATE_TIME); + when(blob.getUpdateTime()).thenReturn(UPDATE_TIME); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0); + + mockFlowFile.assertAttributeEquals(BUCKET_ATTR, BUCKET); + mockFlowFile.assertAttributeEquals(KEY_ATTR, KEY); + mockFlowFile.assertAttributeEquals(SIZE_ATTR, String.valueOf(SIZE)); + mockFlowFile.assertAttributeEquals(CACHE_CONTROL_ATTR, CACHE_CONTROL); + mockFlowFile.assertAttributeEquals(COMPONENT_COUNT_ATTR, String.valueOf(COMPONENT_COUNT)); + mockFlowFile.assertAttributeEquals(CONTENT_DISPOSITION_ATTR, CONTENT_DISPOSITION); + mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), FILENAME); + mockFlowFile.assertAttributeEquals(CONTENT_ENCODING_ATTR, CONTENT_ENCODING); + mockFlowFile.assertAttributeEquals(CONTENT_LANGUAGE_ATTR, CONTENT_LANGUAGE); + mockFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CONTENT_TYPE); + mockFlowFile.assertAttributeEquals(CRC32C_ATTR, CRC32C); + mockFlowFile.assertAttributeEquals(ENCRYPTION_ALGORITHM_ATTR, ENCRYPTION); + mockFlowFile.assertAttributeEquals(ENCRYPTION_SHA256_ATTR, ENCRYPTION_SHA256); + mockFlowFile.assertAttributeEquals(ETAG_ATTR, ETAG); + mockFlowFile.assertAttributeEquals(GENERATED_ID_ATTR, GENERATED_ID); + mockFlowFile.assertAttributeEquals(GENERATION_ATTR, String.valueOf(GENERATION)); + mockFlowFile.assertAttributeEquals(MD5_ATTR, MD5); + mockFlowFile.assertAttributeEquals(MEDIA_LINK_ATTR, MEDIA_LINK); + mockFlowFile.assertAttributeEquals(METAGENERATION_ATTR, String.valueOf(METAGENERATION)); + mockFlowFile.assertAttributeEquals(URI_ATTR, URI); + mockFlowFile.assertAttributeEquals(CREATE_TIME_ATTR, String.valueOf(CREATE_TIME)); + mockFlowFile.assertAttributeEquals(UPDATE_TIME_ATTR, String.valueOf(UPDATE_TIME)); + } + + @Test + public void testAclAttributeUser() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenReturn(blob); + + final Acl.User mockUser = mock(Acl.User.class); + when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL); + when(blob.getOwner()).thenReturn(mockUser); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(OWNER_ATTR, OWNER_USER_EMAIL); + mockFlowFile.assertAttributeEquals(OWNER_TYPE_ATTR, "user"); + } + + @Test + public void testAclAttributeGroup() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenReturn(blob); + + final Acl.Group mockGroup = mock(Acl.Group.class); + when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL); + when(blob.getOwner()).thenReturn(mockGroup); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(OWNER_ATTR, OWNER_GROUP_EMAIL); + mockFlowFile.assertAttributeEquals(OWNER_TYPE_ATTR, "group"); + } + + + @Test + public void testAclAttributeDomain() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenReturn(blob); + + final Acl.Domain mockDomain = mock(Acl.Domain.class); + when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN); + when(blob.getOwner()).thenReturn(mockDomain); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(OWNER_ATTR, OWNER_DOMAIN); + mockFlowFile.assertAttributeEquals(OWNER_TYPE_ATTR, "domain"); + } + + + @Test + public void testAclAttributeProject() throws Exception { + reset(storage, blob); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenReturn(blob); + + final Acl.Project mockProject = mock(Acl.Project.class); + when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID); + when(blob.getOwner()).thenReturn(mockProject); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS); + runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(OWNER_ATTR, OWNER_PROJECT_ID); + mockFlowFile.assertAttributeEquals(OWNER_TYPE_ATTR, "project"); + } + + @Test + public void testFailureHandling() throws Exception { + reset(storage); + final PutGCSObject processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class))) + .thenThrow(new StorageException(404, "test exception")); + + runner.enqueue("test"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutGCSObject.REL_FAILURE); + runner.assertTransferCount(PutGCSObject.REL_FAILURE, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_FAILURE).get(0); + assertTrue(mockFlowFile.isPenalized()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/StorageAttributesTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/StorageAttributesTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/StorageAttributesTest.java new file mode 100644 index 0000000..61ec9f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/StorageAttributesTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Modifier; + +import static org.junit.Assert.assertTrue; + +public class StorageAttributesTest { + @Test + public void testStorageAttributeClassCannotBeInvoked() throws Exception { + Constructor constructor = StorageAttributes.class.getDeclaredConstructor(); + assertTrue("Constructor of StorageAttributes should be private", Modifier.isPrivate(constructor.getModifiers())); + constructor.setAccessible(true); + constructor.newInstance(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/UtilTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/UtilTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/UtilTest.java new file mode 100644 index 0000000..0b2287d --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/UtilTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * Tests the Util class static methods. + */ +public class UtilTest { + @Test + public void testContentDispositionParsing() throws Exception { + final String contentDisposition = "attachment; filename=\"plans.pdf\""; + + final Util.ParsedContentDisposition parsed = Util.parseContentDisposition(contentDisposition); + assertNotNull(parsed); + assertEquals("plans.pdf", + parsed.getFileName()); + + assertEquals("attachment", + parsed.getContentDispositionType()); + } + + @Test + public void testContentDispositionParsingBadParse() throws Exception { + final String contentDisposition = "bad-header"; + + assertNull(Util.parseContentDisposition(contentDisposition)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-application-default-credentials.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-application-default-credentials.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-application-default-credentials.json new file mode 100644 index 0000000..f045ca6 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-application-default-credentials.json @@ -0,0 +1,6 @@ +{ + "client_id": "123456789012-af23m23321knfg00ekrjlwke90rjkewl.apps.googleusercontent.com", + "client_secret": "d-MbDD42LcmsVNVdls21dBAs", + "refresh_token": "1/cvZBer532GBbzsxdf7jj7LOvd-IcmbSa5tgVcls5j5z", + "type": "authorized_user" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-service-account.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-service-account.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-service-account.json new file mode 100644 index 0000000..c1d9190 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mock-gcp-service-account.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "test", + "private_key_id": "testHash", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBewuX0bvBzOq0\nWu0AGbJHvDiCim1jJ/Oec7AIKW2IlafMY5VYussKVyt7zI1GIPrOyMmVed9eTd0E\niAahfYhgsVUEl9fd4m53Gm9A+I14C7p0PLC5IByc+DDYKpHtAf9CPggHcZExiDqU\n0NyxzEcimi/o9Qev2MJH3tv8jNzOVNfv1B5K6DYrce++x+8gU/RetXWn9htwIAsE\nHdfyKR+J7RR8/tUtqx/19RuXMTKCpBKDcHa5LFpv6jA0yLjp+4QeQcCa63azCoJx\nhmD4Y+gvxRV5nGxk7QIAWiax4A5PNlJcYBAn188isC1ZgDU+kyyM+nrejXVoip5r\nd3b/A7sjAgMBAAECggEBALTqeoemzRtFon2svAoo/QSI4opmKCzcsbeLU6H+Ivbh\ngXrj70V9vNfZdMaZGczmj7+GDsDfqdcDldRj4VdmC3zmtKnL1kUbMtHZ/QfSom4L\nAXkpOtKQTVEV3o5zF+p3wJjPajCTqAGZ8bUvq/3xFt8rL/t0C5EJbXlI0YlQqjOf\nj7y/XEo+EBBCwNazkX45tPK0EwQuo8XsWBeCX59BGR5An/LIOixdvxRMaprApNpH\ndS3Ap3d9s0c4YXgugpBbbiVL2hnqag4teXmX6p0tZbDone4VrpSJWOhSnvuBpHbi\neVyu3ByhsgRKKwm0UnOOFvM5N1kUNjIs+TQ4n/Cv4ukCgYEA5Q2+3frLip1AXbXv\n/FuL5WXBgNH9dHIoYVvERu7e7sjUwpqbQn0va+biYzqCoR3UzUAgPPi8U8YdQN1Y\nKJz6dQGDeKvPwVuBr2+s3Va9s8L52tV0dXA0sHb8aFxfc2E+zPQH3eWCRuoWcwZr\nx3YLXvNVnSwDvJB/+q3EhNRwYu 0CgYEA2D34bBtqcwZDcgmPdKsPVoshk1uEsa8d\n90i725c6XVbKs5TP/cOWT+DsJmEfHF/mWneJZt0aTh6O6tpEDtIIISBvqdNoztnv\nt4tQ9+HD3p5JjoFeIZlFaTQQZoCC1PgfYa9xutxO8hltaWpp50+S00wEdgUA0zCM\nj884Vhk/hE8CgYA3Ub6LNgr6i0gEWfB/7kw3NwAo8I5aFUgTW2poB0DoQrC/3z8o\nK7vMP5LljDgIWYAPojEnCJvTT8G47Lxh8qe6oobyGeyvMj579Gi3fD+MrsZRR8Q8\nqMDQ7avAOK8E2rOkJDvSJ5/zKI4Lcb2OCsBsSjCfKQYuAGgoTtdrjTMncQKBgAUH\nS+OXr54FI0RfnIpl//FPQvSeSDOpktTRSC0PEzhgcE5Ew6FvDuvEmzk5QPPz9vNb\nnEJcGeR/KWukr7h4gd/jVTVpySImR0DJaJSbF2bx31wE/h9h5Q9ROqBnlKNHMdOf\ntNFXli5jEPxGkTfjzdJEDkaAT0iZ9GrTssetxqBZAoGBAISdPHJSot5rgT5ILqQk\nYVjLopguymhz1QzyXoe1g9lC9KQIUQo6iikmocPpWiupktaB5Ck7gmwsMnWYS2ti\nDeWDAS+QC5W3wy40Gos4SN/FsZKTHD87SPHY82rx0/GvbXJKqZmYMM6M6+fM/jJd\n+kaA70VDxYg60IdOgf7o9HqA\n-----END PRIVATE KEY-----", + "client_email": "[email protected]", + "client_id": "123456789", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%40developer.gserviceaccount.com" +} http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..ca6ee9c --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/nifi-gcp-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml new file mode 100644 index 0000000..7e413df --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-gcp-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-gcp-processors</module> + <module>nifi-gcp-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index df3aa7d..f0164af 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -73,6 +73,7 @@ <module>nifi-ranger-bundle</module> <module>nifi-websocket-bundle</module> <module>nifi-tcp-bundle</module> + <module>nifi-gcp-bundle</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/nifi/blob/897c7029/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8a5a53f..c968c6f 100644 --- a/pom.xml +++ b/pom.xml @@ -1290,6 +1290,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-gcp-nar</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>1.2.0-SNAPSHOT</version> </dependency>
