This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new cff7808543 NIFI-11439 Corrected GCS Transit URI for custom Storage API 
URL
cff7808543 is described below

commit cff7808543716da5ca9f93356d7d762e15fdd143
Author: Paul Grey <[email protected]>
AuthorDate: Thu Apr 13 18:36:31 2023 -0400

    NIFI-11439 Corrected GCS Transit URI for custom Storage API URL
    
    This closes #7173
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../gcp/storage/AbstractGCSProcessor.java          |  6 ++++
 .../processors/gcp/storage/FetchGCSObject.java     |  4 ++-
 .../nifi/processors/gcp/storage/PutGCSObject.java  |  4 +--
 .../processors/gcp/storage/FetchGCSObjectTest.java | 33 +++++++++++++++----
 .../processors/gcp/storage/PutGCSObjectTest.java   | 38 +++++++++++++++++-----
 5 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index ad2c248cb5..200bf53e01 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -167,4 +168,9 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
 
         return  
storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
     }
+
+    protected String getTransitUri(final String storageApiUrl, final String 
bucketName, final String key) {
+        final URI apiUri = URI.create(storageApiUrl);
+        return String.format("%s://%s.%s/%s", apiUri.getScheme(), bucketName, 
apiUri.getHost(), key);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index 058c948c93..7bf16277ba 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -273,7 +273,9 @@ public class FetchGCSObject extends AbstractGCSProcessor {
 
         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos);
         getLogger().info("Successfully retrieved GCS Object for {} in {} 
millis; routing to success", new Object[]{flowFile, millis});
-        session.getProvenanceReporter().fetch(flowFile, "https://"; + 
bucketName + ".storage.googleapis.com/" + key, millis);
+
+        final String transitUri = 
getTransitUri(storage.getOptions().getHost(), bucketName, key);
+        session.getProvenanceReporter().fetch(flowFile, transitUri, millis);
     }
 
     private FetchedBlob fetchBlob(final ProcessContext context, final Storage 
storage, final Map<String, String> attributes) throws IOException {
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 393054e435..ef33008b0c 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -542,9 +542,9 @@ public class PutGCSObject extends AbstractGCSProcessor {
             }
             session.transfer(flowFile, REL_SUCCESS);
             final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            final String url = "https://"; + bucket + 
".storage.googleapis.com/" + key;
 
-            session.getProvenanceReporter().send(flowFile, url, millis);
+            final String transitUri = 
getTransitUri(storage.getOptions().getHost(), bucket, key);
+            session.getProvenanceReporter().send(flowFile, transitUri, millis);
             getLogger().info("Successfully put {} to Google Cloud Storage in 
{} milliseconds",
                     new Object[]{ff, millis});
 
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
index 2d15396752..893f351ef5 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
@@ -102,10 +103,14 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
     private static final String OWNER_DOMAIN = "test-owner-domain";
     private static final String OWNER_PROJECT_ID = "test-owner-project-id";
     private static final String URI = "test-uri";
+    private static final String STORAGE_API_URL = "https://localhost";;
     private static final String CONTENT_DISPOSITION = "attachment; 
filename=\"test-content-disposition.txt\"";
     private static final Long CREATE_TIME = 1234L;
     private static final Long UPDATE_TIME = 4567L;
 
+    @Mock
+    StorageOptions storageOptions;
+
     @Mock
     Storage storage;
 
@@ -192,7 +197,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testSuccessfulFetch() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
@@ -347,7 +354,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerUser() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
@@ -387,7 +396,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerGroup() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
@@ -429,7 +440,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerDomain() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
@@ -471,7 +484,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerProject() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
@@ -510,7 +525,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testBlobIdWithGeneration() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
         addRequiredPropertiesToRunner(runner);
 
@@ -568,7 +585,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testBlobIdWithEncryption() throws Exception {
-        reset(storage);
+        reset(storageOptions, storage);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final TestRunner runner = buildNewRunner(getProcessor());
 
         runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_SHA256);
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
index a908e91254..2de1bd6b5a 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
@@ -21,6 +21,7 @@ import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -99,11 +100,15 @@ public class PutGCSObjectTest extends AbstractGCSTest {
     private static final String OWNER_DOMAIN = "test-owner-domain";
     private static final String OWNER_PROJECT_ID = "test-owner-project-id";
     private static final String URI = "test-uri";
+    private static final String STORAGE_API_URL = "https://localhost";;
     private static final String CONTENT_DISPOSITION = "attachment; 
filename=\"" + FILENAME + "\"";
     private static final Long CREATE_TIME = 1234L;
     private static final Long UPDATE_TIME = 4567L;
     private final static Long GENERATION = 5L;
 
+    @Mock
+    StorageOptions storageOptions;
+
     @Mock
     Storage storage;
 
@@ -136,7 +141,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testSuccessfulPutOperationNoParameters() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -167,7 +174,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testSuccessfulPutOperation() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -245,7 +254,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testSuccessfulPutOperationWithUserMetadata() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -292,7 +303,10 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAttributesSetOnSuccessfulPut() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
+
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -360,7 +374,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclAttributeUser() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -386,7 +402,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclAttributeGroup() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -413,7 +431,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclAttributeDomain() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -440,7 +460,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
 
     @Test
     public void testAclAttributeProject() throws Exception {
-        reset(storage, blob);
+        reset(storageOptions, storage, blob);
+        when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+        when(storage.getOptions()).thenReturn(storageOptions);
         final PutGCSObject processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);

Reply via email to