Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1482#discussion_r100471880
--- Diff:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
---
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.storage;
+
+import com.google.cloud.storage.Acl;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static
com.google.cloud.storage.Storage.PredefinedAcl.ALL_AUTHENTICATED_USERS;
+import static
com.google.cloud.storage.Storage.PredefinedAcl.AUTHENTICATED_READ;
+import static
com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_FULL_CONTROL;
+import static
com.google.cloud.storage.Storage.PredefinedAcl.BUCKET_OWNER_READ;
+import static com.google.cloud.storage.Storage.PredefinedAcl.PRIVATE;
+import static
com.google.cloud.storage.Storage.PredefinedAcl.PROJECT_PRIVATE;
+import static com.google.cloud.storage.Storage.PredefinedAcl.PUBLIC_READ;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.COMPONENT_COUNT_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_DISPOSITION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_ENCODING_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_LANGUAGE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CONTENT_TYPE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CRC32C_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.CREATE_TIME_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_ALGORITHM_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ENCRYPTION_SHA256_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.ETAG_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATED_ID_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.GENERATION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MD5_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.MEDIA_LINK_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.METAGENERATION_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_DESC;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google", "google cloud", "gcs", "archive", "put"})
+@CapabilityDescription("Puts flow files to a Google Cloud Bucket.")
+@SeeAlso({FetchGCSObject.class, DeleteGCSObject.class,
ListGCSBucket.class})
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add
to the GCS Object",
+ value = "The value of a User-Defined Metadata field to add to the
GCS Object",
+ description = "Allows user-defined metadata to be added to the GCS
object as key/value pairs",
+ supportsExpressionLanguage = true)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the GCS object")
+@WritesAttributes({
+ @WritesAttribute(attribute = BUCKET_ATTR, description =
BUCKET_DESC),
+ @WritesAttribute(attribute = KEY_ATTR, description = KEY_DESC),
+ @WritesAttribute(attribute = SIZE_ATTR, description = SIZE_DESC),
+ @WritesAttribute(attribute = CACHE_CONTROL_ATTR, description =
CACHE_CONTROL_DESC),
+ @WritesAttribute(attribute = COMPONENT_COUNT_ATTR, description =
COMPONENT_COUNT_DESC),
+ @WritesAttribute(attribute = CONTENT_DISPOSITION_ATTR, description
= CONTENT_DISPOSITION_DESC),
+ @WritesAttribute(attribute = CONTENT_ENCODING_ATTR, description =
CONTENT_ENCODING_DESC),
+ @WritesAttribute(attribute = CONTENT_LANGUAGE_ATTR, description =
CONTENT_LANGUAGE_DESC),
+ @WritesAttribute(attribute = CONTENT_TYPE_ATTR, description =
CONTENT_TYPE_DESC),
+ @WritesAttribute(attribute = CRC32C_ATTR, description =
CRC32C_DESC),
+ @WritesAttribute(attribute = CREATE_TIME_ATTR, description =
CREATE_TIME_DESC),
+ @WritesAttribute(attribute = UPDATE_TIME_ATTR, description =
UPDATE_TIME_DESC),
+ @WritesAttribute(attribute = ENCRYPTION_ALGORITHM_ATTR,
description = ENCRYPTION_ALGORITHM_DESC),
+ @WritesAttribute(attribute = ENCRYPTION_SHA256_ATTR, description =
ENCRYPTION_SHA256_DESC),
+ @WritesAttribute(attribute = ETAG_ATTR, description = ETAG_DESC),
+ @WritesAttribute(attribute = GENERATED_ID_ATTR, description =
GENERATED_ID_DESC),
+ @WritesAttribute(attribute = GENERATION_ATTR, description =
GENERATION_DESC),
+ @WritesAttribute(attribute = MD5_ATTR, description = MD5_DESC),
+ @WritesAttribute(attribute = MEDIA_LINK_ATTR, description =
MEDIA_LINK_DESC),
+ @WritesAttribute(attribute = METAGENERATION_ATTR, description =
METAGENERATION_DESC),
+ @WritesAttribute(attribute = OWNER_ATTR, description = OWNER_DESC),
+ @WritesAttribute(attribute = OWNER_TYPE_ATTR, description =
OWNER_TYPE_DESC),
+ @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
+})
+public class PutGCSObject extends AbstractGCSProcessor {
+ public static final PropertyDescriptor KEY = new PropertyDescriptor
+ .Builder().name("gcs-key")
+ .displayName("Key")
+ .description(KEY_DESC)
+ .required(true)
+ .defaultValue("${filename}")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CONTENT_TYPE = new
PropertyDescriptor
+ .Builder().name("gcs-content-type")
+ .displayName("Content Type")
+ .description("Content Type for the file, i.e.
text/plain")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MD5 = new PropertyDescriptor
+ .Builder().name("gcs-object-md5")
+ .displayName("MD5 Hash")
+ .description("MD5 Hash (encoded in Base64) of the file for
server-side validation.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ public static final PropertyDescriptor CRC32C = new PropertyDescriptor
+ .Builder().name("gcs-object-crc32c")
+ .displayName("CRC32C Checksum")
+ .description("CRC32C Checksum (encoded in Base64, big-Endian
order) of the file for server-side validation.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final AllowableValue ACL_ALL_AUTHENTICATED_USERS = new
AllowableValue(
+ ALL_AUTHENTICATED_USERS.name(), "All Authenticated Users",
"Gives the bucket or object owner OWNER " +
+ "permission, and gives all authenticated Google account
holders READER and WRITER permissions. " +
+ "All other permissions are removed."
+ );
+
+ public static final AllowableValue ACL_AUTHENTICATED_READ = new
AllowableValue(
+ AUTHENTICATED_READ.name(), "Authenticated Read", "Gives the
bucket or object owner OWNER permission, " +
+ "and gives all authenticated Google account holders READER
permission. All other permissions are removed."
+ );
+
+ public static final AllowableValue ACL_BUCKET_OWNER_FULL_CONTROL = new
AllowableValue(
+ BUCKET_OWNER_FULL_CONTROL.name(), "Bucket Owner Full Control",
"Gives the object and bucket owners OWNER " +
+ "permission. All other permissions are removed."
+ );
+
+ public static final AllowableValue ACL_BUCKET_OWNER_READ = new
AllowableValue(
+ BUCKET_OWNER_READ.name(), "Bucket Owner Read Only", "Gives the
object owner OWNER permission, and gives " +
+ "the bucket owner READER permission. All other permissions are
removed."
+ );
+
+ public static final AllowableValue ACL_PRIVATE = new AllowableValue(
+ PRIVATE.name(), "Private", "Gives the bucket or object owner
OWNER permission for a bucket or object, " +
+ "and removes all other access permissions."
+ );
+
+ public static final AllowableValue ACL_PROJECT_PRIVATE = new
AllowableValue(
+ PROJECT_PRIVATE.name(), "Project Private", "Gives permission
to the project team based on their roles. " +
+ "Anyone who is part of the team has READER permission. Project
owners and project editors have OWNER " +
+ "permission. This is the default ACL for newly created
buckets. This is also the default ACL for newly " +
+ "created objects unless the default object ACL for that bucket
has been changed."
+ );
+
+ public static final AllowableValue ACL_PUBLIC_READ = new
AllowableValue(
+ PUBLIC_READ.name(), "Public Read Only", "Gives the bucket or
object owner OWNER permission, and gives all " +
+ "users, both authenticated and anonymous, READER permission.
When you apply this to an object, anyone on " +
+ "the Internet can read the object without authenticating."
+ );
+
+ public static final PropertyDescriptor ACL = new
PropertyDescriptor.Builder()
+ .name("gcs-object-acl")
+ .displayName("Object ACL")
+ .description("Access Control to be attached to the object
uploaded. Not providing this will revert to bucket defaults.")
+ .required(false)
+ .allowableValues(
+ ACL_ALL_AUTHENTICATED_USERS,
+ ACL_AUTHENTICATED_READ,
+ ACL_BUCKET_OWNER_FULL_CONTROL,
+ ACL_BUCKET_OWNER_READ,
+ ACL_PRIVATE,
+ ACL_PROJECT_PRIVATE,
+ ACL_PUBLIC_READ)
+ .build();
+
+ public static final PropertyDescriptor ENCRYPTION_KEY = new
PropertyDescriptor.Builder()
+ .name("gcs-server-side-encryption-key")
+ .displayName("Server Side Encryption Key")
+ .description("An AES256 Encryption Key (encoded in base64) for
server-side encryption of the object.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+
+ public static final PropertyDescriptor OVERWRITE = new
PropertyDescriptor.Builder()
+ .name("gcs-overwrite-object")
+ .displayName("Overwrite Object")
+ .description("If false, the upload to GCS will succeed only if
the object does not exist.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final AllowableValue CD_INLINE = new AllowableValue(
+ "inline", "Inline", "Indicates that the object should be
loaded and rendered within the browser."
+ );
+
+ public static final AllowableValue CD_ATTACHMENT = new AllowableValue(
+ "attachment", "Attachment", "Indicates that the object should
be saved (using a Save As... dialog) rather " +
+ "than opened directly within the browser"
+ );
+
+ public static final PropertyDescriptor CONTENT_DISPOSITION_TYPE = new
PropertyDescriptor.Builder()
+ .name("gcs-content-disposition-type")
+ .displayName("Content Disposition Type")
+ .description("Type of RFC-6266 Content Disposition to be
attached to the object")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues(CD_INLINE, CD_ATTACHMENT)
+ .build();
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.<PropertyDescriptor>builder()
+ .addAll(super.getSupportedPropertyDescriptors())
+ .add(KEY)
+ .add(CONTENT_TYPE)
+ .add(MD5)
+ .add(CRC32C)
+ .add(ACL)
+ .add(ENCRYPTION_KEY)
+ .add(OVERWRITE)
+ .add(CONTENT_DISPOSITION_TYPE)
+ .build();
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ final String bucket = context.getProperty(BUCKET)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final String key = context.getProperty(KEY)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final boolean overwrite =
context.getProperty(OVERWRITE).asBoolean();
+
+ final FlowFile ff = flowFile;
+ final String ffFilename =
ff.getAttributes().get(CoreAttributes.FILENAME.key());
+ final Map<String, String> attributes = new HashMap<>();
+
+ try {
+ final Storage storage = getCloudService();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream rawIn) throws IOException {
+ try (final InputStream in = new
BufferedInputStream(rawIn)) {
+ final BlobId id = BlobId.of(bucket, key);
+ final BlobInfo.Builder blobInfoBuilder =
BlobInfo.newBuilder(id);
+ final List<Storage.BlobWriteOption>
blobWriteOptions = new ArrayList<>();
+
+ if (!overwrite) {
+
blobWriteOptions.add(Storage.BlobWriteOption.doesNotExist());
+ }
+
+ final String contentDispositionType =
context.getProperty(CONTENT_DISPOSITION_TYPE).getValue();
+ if (contentDispositionType != null) {
+
blobInfoBuilder.setContentDisposition(contentDispositionType + "; filename=" +
ffFilename);
+ }
+
+ final String contentType =
context.getProperty(CONTENT_TYPE)
+
.evaluateAttributeExpressions(ff).getValue();
+ if (contentType != null) {
+ blobInfoBuilder.setContentType(contentType);
+ }
+
+ final String md5 = context.getProperty(MD5)
+
.evaluateAttributeExpressions(ff).getValue();
+ if (md5 != null) {
+ blobInfoBuilder.setMd5(md5);
+
blobWriteOptions.add(Storage.BlobWriteOption.md5Match());
+ }
+
+ final String crc32c = context.getProperty(CRC32C)
+
.evaluateAttributeExpressions(ff).getValue();
+ if (crc32c != null) {
+ blobInfoBuilder.setCrc32c(crc32c);
+
blobWriteOptions.add(Storage.BlobWriteOption.crc32cMatch());
+ }
+
+ final String acl =
context.getProperty(ACL).getValue();
+ if (acl != null) {
+
blobWriteOptions.add(Storage.BlobWriteOption.predefinedAcl(
+ Storage.PredefinedAcl.valueOf(acl)
+ ));
+ }
+
+ final String encryptionKey =
context.getProperty(ENCRYPTION_KEY)
+
.evaluateAttributeExpressions(ff).getValue();
+ if (encryptionKey != null) {
+
blobWriteOptions.add(Storage.BlobWriteOption.encryptionKey(encryptionKey));
+ }
+
+ final HashMap<String, String> userMetadata = new
HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String>
entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value = context.getProperty(
+
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ userMetadata.put(entry.getKey().getName(),
value);
+ }
+ }
+
+ if (!userMetadata.isEmpty()) {
+ blobInfoBuilder.setMetadata(userMetadata);
+ }
+
+ try {
+
+ //TODO: Take advantage of RestorableState to
checkpoint large blob uploads
--- End diff --
Same thing about TODO comments
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---