This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bce14f573b NIFI-12643 Added support for FileResourceService in
PutGCSObject
bce14f573b is described below
commit bce14f573b57685637d776333029641e62730d26
Author: Balázs Gerner <[email protected]>
AuthorDate: Fri Jan 19 09:49:30 2024 +0100
NIFI-12643 Added support for FileResourceService in PutGCSObject
This closes #8281.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 5 +
.../nifi/processors/gcp/storage/PutGCSObject.java | 375 +++++++++++----------
.../processors/gcp/storage/PutGCSObjectTest.java | 55 ++-
3 files changed, 242 insertions(+), 193 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 70f104365e..897e710395 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -69,6 +69,11 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-resource-transfer</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service-api</artifactId>
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 07546637da..438fac19b8 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -22,15 +22,6 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -43,13 +34,23 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static
com.google.cloud.storage.Storage.PredefinedAcl.ALL_AUTHENTICATED_USERS;
import static
com.google.cloud.storage.Storage.PredefinedAcl.AUTHENTICATED_READ;
@@ -102,6 +103,9 @@ import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TI
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -290,6 +294,8 @@ public class PutGCSObject extends AbstractGCSProcessor {
final List<PropertyDescriptor> descriptors = new
ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(BUCKET);
descriptors.add(KEY);
+ descriptors.add(RESOURCE_TRANSFER_SOURCE);
+ descriptors.add(FILE_RESOURCE_SERVICE);
descriptors.add(CONTENT_TYPE);
descriptors.add(CRC32C);
descriptors.add(ACL);
@@ -322,199 +328,196 @@ public class PutGCSObject extends AbstractGCSProcessor {
return;
}
- final long startNanos = System.nanoTime();
+ try {
+ final long startNanos = System.nanoTime();
+
+ final String bucket = context.getProperty(BUCKET)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final String key = context.getProperty(KEY)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final boolean overwrite =
context.getProperty(OVERWRITE).asBoolean();
+
+ final FlowFile ff = flowFile;
+ final String ffFilename =
ff.getAttributes().get(CoreAttributes.FILENAME.key());
+ final Map<String, String> attributes = new HashMap<>();
+ final ResourceTransferSource resourceTransferSource =
context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+ final Storage storage = getCloudService();
- final String bucket = context.getProperty(BUCKET)
- .evaluateAttributeExpressions(flowFile)
- .getValue();
- final String key = context.getProperty(KEY)
- .evaluateAttributeExpressions(flowFile)
- .getValue();
- final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
+ try (final InputStream inputStream =
getFileResource(resourceTransferSource, context, flowFile.getAttributes())
+ .map(FileResource::getInputStream)
+ .orElseGet(() -> session.read(ff))) {
- final FlowFile ff = flowFile;
- final String ffFilename =
ff.getAttributes().get(CoreAttributes.FILENAME.key());
- final Map<String, String> attributes = new HashMap<>();
+ final BlobId id = BlobId.of(bucket, key);
+ final BlobInfo.Builder blobInfoBuilder =
BlobInfo.newBuilder(id);
+ final List<Storage.BlobWriteOption> blobWriteOptions = new
ArrayList<>();
- try {
- final Storage storage = getCloudService();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream rawIn) throws IOException {
- try (final InputStream in = new
BufferedInputStream(rawIn)) {
- final BlobId id = BlobId.of(bucket, key);
- final BlobInfo.Builder blobInfoBuilder =
BlobInfo.newBuilder(id);
- final List<Storage.BlobWriteOption> blobWriteOptions =
new ArrayList<>();
-
- if (!overwrite) {
-
blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
- }
+ if (!overwrite) {
+
blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
+ }
- final String contentDispositionType =
context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
- if (contentDispositionType != null) {
-
blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" +
ffFilename);
- }
+ final String contentDispositionType =
context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
+ if (contentDispositionType != null) {
+
blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" +
ffFilename);
+ }
- final String contentType =
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue();
- if (contentType != null) {
- blobInfoBuilder.setContentType(contentType);
- }
+ final String contentType =
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(ff).getValue();
+ if (contentType != null) {
+ blobInfoBuilder.setContentType(contentType);
+ }
- final String crc32c =
context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue();
- if (crc32c != null) {
- blobInfoBuilder.setCrc32c(crc32c);
-
blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
- }
+ final String crc32c =
context.getProperty(CRC32C).evaluateAttributeExpressions(ff).getValue();
+ if (crc32c != null) {
+ blobInfoBuilder.setCrc32c(crc32c);
+
blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
+ }
- final String acl = context.getProperty(ACL).getValue();
- if (acl != null) {
-
blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
- Storage.PredefinedAcl.valueOf(acl)
- ));
- }
+ final String acl = context.getProperty(ACL).getValue();
+ if (acl != null) {
+ blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
+ Storage.PredefinedAcl.valueOf(acl)
+ ));
+ }
- final String encryptionKey =
context.getProperty(ENCRYPTION_KEY)
- .evaluateAttributeExpressions(ff).getValue();
- if (encryptionKey != null) {
-
blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
- }
+ final String encryptionKey =
context.getProperty(ENCRYPTION_KEY)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (encryptionKey != null) {
+
blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
+ }
- final boolean gzipCompress =
context.getProperty(GZIPCONTENT).asBoolean();
- if (!gzipCompress){
-
blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent());
- }
+ final boolean gzipCompress =
context.getProperty(GZIPCONTENT).asBoolean();
+ if (!gzipCompress) {
+
blobWriteOptions.add(Storage.BlobWriteOption.disableGzipContent());
+ }
- final HashMap<String, String> userMetadata = new
HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry
: context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value = context.getProperty(
-
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
- userMetadata.put(entry.getKey().getName(),
value);
- }
- }
+ final HashMap<String, String> userMetadata = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value = context.getProperty(
+
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ userMetadata.put(entry.getKey().getName(), value);
+ }
+ }
+
+ if (!userMetadata.isEmpty()) {
+ blobInfoBuilder.setMetadata(userMetadata);
+ }
- if (!userMetadata.isEmpty()) {
- blobInfoBuilder.setMetadata(userMetadata);
+ try {
+ final Blob blob =
storage.createFrom(blobInfoBuilder.build(),
+ inputStream,
+ blobWriteOptions.toArray(new
Storage.BlobWriteOption[blobWriteOptions.size()])
+ );
+
+ // Create attributes
+ attributes.put(BUCKET_ATTR, blob.getBucket());
+ attributes.put(KEY_ATTR, blob.getName());
+
+
+ if (blob.getSize() != null) {
+ attributes.put(SIZE_ATTR,
String.valueOf(blob.getSize()));
+ }
+
+ if (blob.getCacheControl() != null) {
+ attributes.put(CACHE_CONTROL_ATTR,
blob.getCacheControl());
+ }
+
+ if (blob.getComponentCount() != null) {
+ attributes.put(COMPONENT_COUNT_ATTR,
String.valueOf(blob.getComponentCount()));
+ }
+
+ if (blob.getContentDisposition() != null) {
+ attributes.put(CONTENT_DISPOSITION_ATTR,
blob.getContentDisposition());
+ final Util.ParsedContentDisposition parsed =
Util.parseContentDisposition(blob.getContentDisposition());
+
+ if (parsed != null) {
+ attributes.put(CoreAttributes.FILENAME.key(),
parsed.getFileName());
}
+ }
+
+ if (blob.getContentEncoding() != null) {
+ attributes.put(CONTENT_ENCODING_ATTR,
blob.getContentEncoding());
+ }
+
+ if (blob.getContentLanguage() != null) {
+ attributes.put(CONTENT_LANGUAGE_ATTR,
blob.getContentLanguage());
+ }
+
+ if (blob.getContentType() != null) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
blob.getContentType());
+ }
+
+ if (blob.getCrc32c() != null) {
+ attributes.put(CRC32C_ATTR, blob.getCrc32c());
+ }
+
+ if (blob.getCustomerEncryption() != null) {
+ final BlobInfo.CustomerEncryption encryption =
blob.getCustomerEncryption();
- try {
- final Blob blob =
storage.createFrom(blobInfoBuilder.build(),
- in,
- blobWriteOptions.toArray(new
Storage.BlobWriteOption[blobWriteOptions.size()])
- );
-
- // Create attributes
- attributes.put(BUCKET_ATTR, blob.getBucket());
- attributes.put(KEY_ATTR, blob.getName());
-
-
- if (blob.getSize() != null) {
- attributes.put(SIZE_ATTR,
String.valueOf(blob.getSize()));
- }
-
- if (blob.getCacheControl() != null) {
- attributes.put(CACHE_CONTROL_ATTR,
blob.getCacheControl());
- }
-
- if (blob.getComponentCount() != null) {
- attributes.put(COMPONENT_COUNT_ATTR,
String.valueOf(blob.getComponentCount()));
- }
-
- if (blob.getContentDisposition() != null) {
- attributes.put(CONTENT_DISPOSITION_ATTR,
blob.getContentDisposition());
- final Util.ParsedContentDisposition parsed =
Util.parseContentDisposition(blob.getContentDisposition());
-
- if (parsed != null) {
-
attributes.put(CoreAttributes.FILENAME.key(), parsed.getFileName());
- }
- }
-
- if (blob.getContentEncoding() != null) {
- attributes.put(CONTENT_ENCODING_ATTR,
blob.getContentEncoding());
- }
-
- if (blob.getContentLanguage() != null) {
- attributes.put(CONTENT_LANGUAGE_ATTR,
blob.getContentLanguage());
- }
-
- if (blob.getContentType() != null) {
- attributes.put(CoreAttributes.MIME_TYPE.key(),
blob.getContentType());
- }
-
- if (blob.getCrc32c() != null) {
- attributes.put(CRC32C_ATTR, blob.getCrc32c());
- }
-
- if (blob.getCustomerEncryption() != null) {
- final BlobInfo.CustomerEncryption encryption =
blob.getCustomerEncryption();
-
- attributes.put(ENCRYPTION_ALGORITHM_ATTR,
encryption.getEncryptionAlgorithm());
- attributes.put(ENCRYPTION_SHA256_ATTR,
encryption.getKeySha256());
- }
-
- if (blob.getEtag() != null) {
- attributes.put(ETAG_ATTR, blob.getEtag());
- }
-
- if (blob.getGeneratedId() != null) {
- attributes.put(GENERATED_ID_ATTR,
blob.getGeneratedId());
- }
-
- if (blob.getGeneration() != null) {
- attributes.put(GENERATION_ATTR,
String.valueOf(blob.getGeneration()));
- }
-
- if (blob.getMd5() != null) {
- attributes.put(MD5_ATTR, blob.getMd5());
- }
-
- if (blob.getMediaLink() != null) {
- attributes.put(MEDIA_LINK_ATTR,
blob.getMediaLink());
- }
-
- if (blob.getMetageneration() != null) {
- attributes.put(METAGENERATION_ATTR,
String.valueOf(blob.getMetageneration()));
- }
-
- if (blob.getOwner() != null) {
- final Acl.Entity entity = blob.getOwner();
-
- if (entity instanceof Acl.User) {
- attributes.put(OWNER_ATTR, ((Acl.User)
entity).getEmail());
- attributes.put(OWNER_TYPE_ATTR, "user");
- } else if (entity instanceof Acl.Group) {
- attributes.put(OWNER_ATTR, ((Acl.Group)
entity).getEmail());
- attributes.put(OWNER_TYPE_ATTR, "group");
- } else if (entity instanceof Acl.Domain) {
- attributes.put(OWNER_ATTR, ((Acl.Domain)
entity).getDomain());
- attributes.put(OWNER_TYPE_ATTR, "domain");
- } else if (entity instanceof Acl.Project) {
- attributes.put(OWNER_ATTR, ((Acl.Project)
entity).getProjectId());
- attributes.put(OWNER_TYPE_ATTR, "project");
- }
- }
-
- if (blob.getSelfLink() != null) {
- attributes.put(URI_ATTR, blob.getSelfLink());
- }
-
- if (blob.getCreateTimeOffsetDateTime() != null) {
- attributes.put(CREATE_TIME_ATTR,
String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli()));
- }
-
- if (blob.getUpdateTimeOffsetDateTime() != null) {
- attributes.put(UPDATE_TIME_ATTR,
String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli()));
- }
- } catch (StorageException e) {
- getLogger().error("Failure completing upload
flowfile={} bucket={} key={} reason={}",
- ffFilename, bucket, key, e.getMessage(),
e);
- throw (e);
+ attributes.put(ENCRYPTION_ALGORITHM_ATTR,
encryption.getEncryptionAlgorithm());
+ attributes.put(ENCRYPTION_SHA256_ATTR,
encryption.getKeySha256());
+ }
+
+ if (blob.getEtag() != null) {
+ attributes.put(ETAG_ATTR, blob.getEtag());
+ }
+
+ if (blob.getGeneratedId() != null) {
+ attributes.put(GENERATED_ID_ATTR,
blob.getGeneratedId());
+ }
+
+ if (blob.getGeneration() != null) {
+ attributes.put(GENERATION_ATTR,
String.valueOf(blob.getGeneration()));
+ }
+
+ if (blob.getMd5() != null) {
+ attributes.put(MD5_ATTR, blob.getMd5());
+ }
+
+ if (blob.getMediaLink() != null) {
+ attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
+ }
+
+ if (blob.getMetageneration() != null) {
+ attributes.put(METAGENERATION_ATTR,
String.valueOf(blob.getMetageneration()));
+ }
+
+ if (blob.getOwner() != null) {
+ final Acl.Entity entity = blob.getOwner();
+
+ if (entity instanceof Acl.User) {
+ attributes.put(OWNER_ATTR, ((Acl.User)
entity).getEmail());
+ attributes.put(OWNER_TYPE_ATTR, "user");
+ } else if (entity instanceof Acl.Group) {
+ attributes.put(OWNER_ATTR, ((Acl.Group)
entity).getEmail());
+ attributes.put(OWNER_TYPE_ATTR, "group");
+ } else if (entity instanceof Acl.Domain) {
+ attributes.put(OWNER_ATTR, ((Acl.Domain)
entity).getDomain());
+ attributes.put(OWNER_TYPE_ATTR, "domain");
+ } else if (entity instanceof Acl.Project) {
+ attributes.put(OWNER_ATTR, ((Acl.Project)
entity).getProjectId());
+ attributes.put(OWNER_TYPE_ATTR, "project");
}
+ }
+ if (blob.getSelfLink() != null) {
+ attributes.put(URI_ATTR, blob.getSelfLink());
+ }
+
+ if (blob.getCreateTimeOffsetDateTime() != null) {
+ attributes.put(CREATE_TIME_ATTR,
String.valueOf(blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli()));
+ }
+ if (blob.getUpdateTimeOffsetDateTime() != null) {
+ attributes.put(UPDATE_TIME_ATTR,
String.valueOf(blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli()));
}
+ } catch (StorageException | IOException e) {
+ getLogger().error("Failure completing upload flowfile={}
bucket={} key={} reason={}",
+ ffFilename, bucket, key, e.getMessage(), e);
+ throw (e);
}
- });
+ }
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
@@ -527,7 +530,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
getLogger().info("Successfully put {} to Google Cloud Storage in
{} milliseconds",
new Object[]{ff, millis});
- } catch (final ProcessException | StorageException e) {
+ } catch (final ProcessException | StorageException | IOException e) {
getLogger().error("Failed to put {} to Google Cloud Storage due to
{}", flowFile, e.getMessage(), e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
index 2cb446d87a..0277ce0d6b 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/PutGCSObjectTest.java
@@ -22,6 +22,17 @@ import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -31,13 +42,6 @@ import java.time.ZoneOffset;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
import static com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
@@ -62,10 +66,13 @@ import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYP
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -174,6 +181,40 @@ public class PutGCSObjectTest extends AbstractGCSTest {
assertNull(blobInfo.getCrc32c());
}
+ @Test
+ public void testSuccessfulPutOperationFromLocalFileSource() throws
Exception {
+ reset(storageOptions, storage, blob);
+ when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
+ when(storage.getOptions()).thenReturn(storageOptions);
+ final PutGCSObject processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+
+ String serviceId = "fileresource";
+ FileResourceService service = mock(FileResourceService.class);
+ InputStream localFileInputStream = mock(InputStream.class);
+ when(service.getIdentifier()).thenReturn(serviceId);
+ when(service.getFileResource(anyMap())).thenReturn(new
FileResource(localFileInputStream, 10));
+
+ runner.addControllerService(serviceId, service);
+ runner.enableControllerService(service);
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE,
ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ runner.assertValid();
+
+ when(storage.createFrom(blobInfoArgumentCaptor.capture(),
+ inputStreamArgumentCaptor.capture(),
+ blobWriteOptionArgumentCaptor.capture())).thenReturn(blob);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
+ runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
+ assertEquals(inputStreamArgumentCaptor.getValue(),
localFileInputStream);
+ }
+
@Test
public void testSuccessfulPutOperation() throws Exception {
reset(storageOptions, storage, blob);