[
https://issues.apache.org/jira/browse/NIFI-3449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860789#comment-15860789
]
ASF GitHub Bot commented on NIFI-3449:
--------------------------------------
Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1482#discussion_r100469527
--- Diff:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.storage;
+
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Acl;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google cloud", "google", "storage", "gcs", "fetch"})
+@CapabilityDescription("Fetches a file from a Google Cloud Bucket.
Designed to be used in tandem with ListGCSBucket.")
+@SeeAlso({ListGCSBucket.class, PutGCSObject.class, DeleteGCSObject.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The name
of the file, parsed if possible from the " +
+ "Content-Disposition response header"),
+ @WritesAttribute(attribute = BUCKET_ATTR, description =
BUCKET_DESC),
+ @WritesAttribute(attribute = KEY_ATTR, description = KEY_DESC),
+ @WritesAttribute(attribute = SIZE_ATTR, description = SIZE_DESC),
+ @WritesAttribute(attribute = CACHE_CONTROL_ATTR, description =
CACHE_CONTROL_DESC),
+ @WritesAttribute(attribute = COMPONENT_COUNT_ATTR, description =
COMPONENT_COUNT_DESC),
+ @WritesAttribute(attribute = CONTENT_DISPOSITION_ATTR, description
= CONTENT_DISPOSITION_DESC),
+ @WritesAttribute(attribute = CONTENT_ENCODING_ATTR, description =
CONTENT_ENCODING_DESC),
+ @WritesAttribute(attribute = CONTENT_LANGUAGE_ATTR, description =
CONTENT_LANGUAGE_DESC),
+ @WritesAttribute(attribute = CONTENT_TYPE_ATTR, description =
CONTENT_TYPE_DESC),
+ @WritesAttribute(attribute = CRC32C_ATTR, description =
CRC32C_DESC),
+ @WritesAttribute(attribute = CREATE_TIME_ATTR, description =
CREATE_TIME_DESC),
+ @WritesAttribute(attribute = UPDATE_TIME_ATTR, description =
UPDATE_TIME_DESC),
+ @WritesAttribute(attribute = ENCRYPTION_ALGORITHM_ATTR,
description = ENCRYPTION_ALGORITHM_DESC),
+ @WritesAttribute(attribute = ENCRYPTION_SHA256_ATTR, description =
ENCRYPTION_SHA256_DESC),
+ @WritesAttribute(attribute = ETAG_ATTR, description = ETAG_DESC),
+ @WritesAttribute(attribute = GENERATED_ID_ATTR, description =
GENERATED_ID_DESC),
+ @WritesAttribute(attribute = GENERATION_ATTR, description =
GENERATION_DESC),
+ @WritesAttribute(attribute = MD5_ATTR, description = MD5_DESC),
+ @WritesAttribute(attribute = MEDIA_LINK_ATTR, description =
MEDIA_LINK_DESC),
+ @WritesAttribute(attribute = METAGENERATION_ATTR, description =
METAGENERATION_DESC),
+ @WritesAttribute(attribute = OWNER_ATTR, description = OWNER_DESC),
+ @WritesAttribute(attribute = OWNER_TYPE_ATTR, description =
OWNER_TYPE_DESC),
+ @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
+})
+public class FetchGCSObject extends AbstractGCSProcessor {
+ public static final PropertyDescriptor KEY = new PropertyDescriptor
+ .Builder().name("gcs-key")
+ .displayName("Key")
+ .description(KEY_DESC)
+ .required(true)
+ .defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor GENERATION = new
PropertyDescriptor.Builder()
+ .name("gcs-generation")
+ .displayName("Object Generation")
+ .description("The generation of the Object to download. If
null, will download latest generation.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor ENCRYPTION_KEY = new
PropertyDescriptor.Builder()
+ .name("gcs-server-side-encryption-key")
+ .displayName("Server Side Encryption Key")
+ .description("An AES256 Key (encoded in base64) which the
object has been encrypted in.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.<PropertyDescriptor>builder()
+ .addAll(super.getSupportedPropertyDescriptors())
+ .add(KEY)
+ .add(GENERATION)
+ .add(ENCRYPTION_KEY)
+ .build();
+ }
+
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ String bucketName = context.getProperty(BUCKET)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ String key = context.getProperty(KEY)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ Long generation = context.getProperty(GENERATION)
+ .evaluateAttributeExpressions(flowFile)
+ .asLong();
+ String encryptionKey = context.getProperty(ENCRYPTION_KEY)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+
+ final Storage storage = getCloudService();
+ final Map<String, String> attributes = new HashMap<>();
+ final BlobId blobId = BlobId.of(bucketName, key, generation);
+
+ try {
+ final List<Storage.BlobSourceOption> blobSourceOptions = new
ArrayList<>(2);
+
+ if (encryptionKey != null) {
+
blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey(encryptionKey));
+ }
+
+ if (generation != null) {
+
blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
+ }
+
+ final Blob blob = storage.get(blobId);
+
+ if (blob == null) {
+ throw new StorageException(404, "Blob " + blobId + " not
found");
+ }
+
+ final ReadChannel reader = storage.reader(blobId,
blobSourceOptions.toArray(new
Storage.BlobSourceOption[blobSourceOptions.size()]));
+
+ //TODO: Implement state checkpoints / ability to restore for
long blob reads
+ flowFile = session.importFrom(Channels.newInputStream(reader),
flowFile);
+
+ attributes.put(BUCKET_ATTR, blob.getBucket());
+ attributes.put(KEY_ATTR, blob.getName());
+
+ if (blob.getSize() != null) {
+ attributes.put(SIZE_ATTR, String.valueOf(blob.getSize()));
+ }
+
+ if (blob.getCacheControl() != null) {
+ attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl());
+ }
+
+ if (blob.getComponentCount() != null) {
+ attributes.put(COMPONENT_COUNT_ATTR,
String.valueOf(blob.getComponentCount()));
+ }
+
+ if (blob.getContentEncoding() != null) {
+ attributes.put(CONTENT_ENCODING_ATTR,
blob.getContentEncoding());
+ }
+
+ if (blob.getContentLanguage() != null) {
+ attributes.put(CONTENT_LANGUAGE_ATTR,
blob.getContentLanguage());
+ }
+
+ if (blob.getContentType() != null) {
+ attributes.put(CONTENT_TYPE_ATTR, blob.getContentType());
+ }
+
+ if (blob.getCrc32c() != null) {
+ attributes.put(CRC32C_ATTR, blob.getCrc32c());
+ }
+
+ if (blob.getCustomerEncryption() != null) {
+ final BlobInfo.CustomerEncryption encryption =
blob.getCustomerEncryption();
+
+ attributes.put(ENCRYPTION_ALGORITHM_ATTR,
encryption.getEncryptionAlgorithm());
+ attributes.put(ENCRYPTION_SHA256_ATTR,
encryption.getKeySha256());
+ }
+
+ if (blob.getEtag() != null) {
+ attributes.put(ETAG_ATTR, blob.getEtag());
+ }
+
+ if (blob.getGeneratedId() != null) {
+ attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId());
+ }
+
+ if (blob.getGeneration() != null) {
+ attributes.put(GENERATION_ATTR,
String.valueOf(blob.getGeneration()));
+ }
+
+ if (blob.getMd5() != null) {
+ attributes.put(MD5_ATTR, blob.getMd5());
+ }
+
+ if (blob.getMediaLink() != null) {
+ attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
+ }
+
+ if (blob.getMetageneration() != null) {
+ attributes.put(METAGENERATION_ATTR,
String.valueOf(blob.getMetageneration()));
+ }
+
+ if (blob.getOwner() != null) {
+ final Acl.Entity entity = blob.getOwner();
+
+ if (entity instanceof Acl.User) {
+ attributes.put(OWNER_ATTR, ((Acl.User)
entity).getEmail());
+ attributes.put(OWNER_TYPE_ATTR, "user");
+ } else if (entity instanceof Acl.Group) {
+ attributes.put(OWNER_ATTR, ((Acl.Group)
entity).getEmail());
+ attributes.put(OWNER_TYPE_ATTR, "group");
+ } else if (entity instanceof Acl.Domain) {
+ attributes.put(OWNER_ATTR, ((Acl.Domain)
entity).getDomain());
+ attributes.put(OWNER_TYPE_ATTR, "domain");
+ } else if (entity instanceof Acl.Project) {
+ attributes.put(OWNER_ATTR, ((Acl.Project)
entity).getProjectId());
+ attributes.put(OWNER_TYPE_ATTR, "project");
+ }
+ }
+
+ if (blob.getSelfLink() != null) {
+ attributes.put(URI_ATTR, blob.getSelfLink());
+ }
+
+ if (blob.getContentDisposition() != null) {
+ attributes.put(CONTENT_DISPOSITION_ATTR,
blob.getContentDisposition());
+
+ final Util.ParsedContentDisposition
parsedContentDisposition =
Util.parseContentDisposition(blob.getContentDisposition());
+
+ if (parsedContentDisposition != null) {
+ attributes.put(CoreAttributes.FILENAME.key(),
parsedContentDisposition.getFileName());
+ }
+ }
+
+ if (blob.getCreateTime() != null) {
+ attributes.put(CREATE_TIME_ATTR,
String.valueOf(blob.getCreateTime()));
+ }
+
+ if (blob.getUpdateTime() != null) {
+ attributes.put(UPDATE_TIME_ATTR,
String.valueOf(blob.getUpdateTime()));
+ }
+
+ //TODO: Have some sensible way of including user defined
metadata attached to the Blob
--- End diff --
Same thing about TODO comments
> Create Google Cloud Platform/Google Cloud Storage Processors
> ------------------------------------------------------------
>
> Key: NIFI-3449
> URL: https://issues.apache.org/jira/browse/NIFI-3449
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Gene Peters
> Labels: features
>
> Hi all,
> We had a need in our production deployments to interact with Google Cloud
> Storage. At the time, NIFI-2809 hadn't seen much movement, and after applying
> the patch I found that the configuration was too specific for my needs (it's
> hardcoded to use "Application Default" credentials, everything uploaded to
> GCS is uploaded with the "public" ACL, etc). So I created a series of
> Processors / Controller Services based off of the AWS NiFi library, and would
> like to contribute them.
> Features:
> * All credentialing is handled by a controller service, allowing multiple
> processors to use the same service / credentials
> * An Abstract processor is provided which forms the basis for all GCP related
> processors.
> * The standard Google Cloud Storage operations are supported, very similarly
> to the AWS S3 processors: ListGCSBucket, DeleteGCSObject, FetchGCSObject,
> PutGCSObject
> * Everything is documented and unit tested.
> * I've also provided integration tests, but they're disabled by default (as
> they require Google Cloud credentials). To run them, use the flag
> {{skipGCPIntegrationTests=false}}
> Todo:
> * The GCP Java library's ReadChannel objects implement the "restorable"
> interface, which allows for state saving / checkpointing. I'd really like to
> leverage this with the State support that NiFi provides, but it would require
> serializing / deserializing the object.
> I'm going to be submitting this as a pull request through GitHub.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)