This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new cff7808543 NIFI-11439 Corrected GCS Transit URI for custom Storage API
URL
cff7808543 is described below
commit cff7808543716da5ca9f93356d7d762e15fdd143
Author: Paul Grey <[email protected]>
AuthorDate: Thu Apr 13 18:36:31 2023 -0400
NIFI-11439 Corrected GCS Transit URI for custom Storage API URL
This closes #7173
Signed-off-by: David Handermann <[email protected]>
---
.../gcp/storage/AbstractGCSProcessor.java | 6 ++++
.../processors/gcp/storage/FetchGCSObject.java | 4 ++-
.../nifi/processors/gcp/storage/PutGCSObject.java | 4 +--
.../processors/gcp/storage/FetchGCSObjectTest.java | 33 +++++++++++++++----
.../processors/gcp/storage/PutGCSObjectTest.java | 38 +++++++++++++++++-----
5 files changed, 67 insertions(+), 18 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index ad2c248cb5..200bf53e01 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -167,4 +168,9 @@ public abstract class AbstractGCSProcessor extends
AbstractGCPProcessor<Storage,
return
storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
}
+
+ protected String getTransitUri(final String storageApiUrl, final String
bucketName, final String key) {
+ final URI apiUri = URI.create(storageApiUrl);
+ return String.format("%s://%s.%s/%s", apiUri.getScheme(), bucketName,
apiUri.getHost(), key);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index 058c948c93..7bf16277ba 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -273,7 +273,9 @@ public class FetchGCSObject extends AbstractGCSProcessor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos);
getLogger().info("Successfully retrieved GCS Object for {} in {}
millis; routing to success", new Object[]{flowFile, millis});
- session.getProvenanceReporter().fetch(flowFile, "https://" +
bucketName + ".storage.googleapis.com/" + key, millis);
+
+ final String transitUri =
getTransitUri(storage.getOptions().getHost(), bucketName, key);
+ session.getProvenanceReporter().fetch(flowFile, transitUri, millis);
}
private FetchedBlob fetchBlob(final ProcessContext context, final Storage
storage, final Map<String, String> attributes) throws IOException {
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 393054e435..ef33008b0c 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -542,9 +542,9 @@ public class PutGCSObject extends AbstractGCSProcessor {
}
session.transfer(flowFile, REL_SUCCESS);
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- final String url = "https://" + bucket +
".storage.googleapis.com/" + key;
- session.getProvenanceReporter().send(flowFile, url, millis);
+ final String transitUri =
getTransitUri(storage.getOptions().getHost(), bucket, key);
+ session.getProvenanceReporter().send(flowFile, transitUri, millis);
getLogger().info("Successfully put {} to Google Cloud Storage in
{} milliseconds",
new Object[]{ff, millis});
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
index 2d15396752..893f351ef5 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
@@ -102,10 +103,14 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
private static final String OWNER_DOMAIN = "test-owner-domain";
private static final String OWNER_PROJECT_ID = "test-owner-project-id";
private static final String URI = "test-uri";
+ private static final String STORAGE_API_URL = "https://localhost";
private static final String CONTENT_DISPOSITION = "attachment;
filename=\"test-content-disposition.txt\"";
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
+ @Mock
+ StorageOptions storageOptions;
+
@Mock
Storage storage;
@@ -192,7 +197,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulFetch() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -347,7 +354,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerUser() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -387,7 +396,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerGroup() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -429,7 +440,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerDomain() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -471,7 +484,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerProject() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -510,7 +525,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testBlobIdWithGeneration() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
@@ -568,7 +585,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testBlobIdWithEncryption() throws Exception {
- reset(storage);
+ reset(storageOptions, storage);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_SHA256);
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
index a908e91254..2de1bd6b5a 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
@@ -21,6 +21,7 @@ import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -99,11 +100,15 @@ public class PutGCSObjectTest extends AbstractGCSTest {
private static final String OWNER_DOMAIN = "test-owner-domain";
private static final String OWNER_PROJECT_ID = "test-owner-project-id";
private static final String URI = "test-uri";
+ private static final String STORAGE_API_URL = "https://localhost";
private static final String CONTENT_DISPOSITION = "attachment;
filename=\"" + FILENAME + "\"";
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
private final static Long GENERATION = 5L;
+ @Mock
+ StorageOptions storageOptions;
+
@Mock
Storage storage;
@@ -136,7 +141,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperationNoParameters() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -167,7 +174,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperation() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -245,7 +254,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperationWithUserMetadata() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -292,7 +303,10 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAttributesSetOnSuccessfulPut() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
+
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -360,7 +374,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeUser() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -386,7 +402,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeGroup() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -413,7 +431,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeDomain() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -440,7 +460,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeProject() throws Exception {
- reset(storage, blob);
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);