nandorsoma commented on code in PR #6832:
URL: https://github.com/apache/nifi/pull/6832#discussion_r1071311474
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java:
##########
@@ -125,9 +145,17 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
String fileId =
context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
FlowFile outFlowFile = flowFile;
+ final long startNanos = System.nanoTime();
try {
outFlowFile = fetchFile(fileId, session, outFlowFile);
+ File fileMetadata = fetchFileMetadata(fileId);
Review Comment:
Minor, but missing final. Also in line 145. I know this part is not modified
in this PR, but do we know why do we have an outFlowFile alias?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+public class GoogleDriveAttributes {
+
+ public static final String ID = "drive.id";
+ public static final String ID_DESC = "The id of the file";
+
+ public static final String FILENAME = "filename";
Review Comment:
I know this part is refactored and not new, but is the "drive" prefix
missing on purpose?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
Review Comment:
From time to time, these resolution types appear. I'm wondering, wouldn't it
make sense to have a global resolution type for that? We have
`AzureStorageConflictResolutionStrategy`, but it is only used in the v12
processor. Even the ADLS one doesn't use it. If we don't want to do that now -
which I totally understand - I recommend using `replace` instead of `overwrite`
to be consistent with the existing convention.
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
Review Comment:
I find it interesting that the base url is that different. Are we sure that
we cannot use the GoogleDriveTrait.DRIVE_URL as a base url?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+ private Future<File> uploadedFileFuture;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = null;
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType =
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ final String folderName =
context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ folderId = createFoldersAndGetParentId(context, flowFile);
+ final long startNanos = System.nanoTime();
+ final ExecutorService uploadExecutor =
Executors.newSingleThreadExecutor();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize =
context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ Optional<File> alreadyExistingFile = checkFileExistence(filename,
folderId);
+ final File fileMetadata = alreadyExistingFile.isPresent() ?
alreadyExistingFile.get() : createMetadata(filename, folderId);
+
+ if (alreadyExistingFile.isPresent() &&
FAIL_RESOLUTION.equals(conflictResolution)) {
+ getLogger().error("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, FAIL_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (alreadyExistingFile.isPresent() &&
IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, IGNORE_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final File uploadedFile;
+
+ try (final InputStream rawIn = session.read(flowFile)) {
+ final InputStreamContent mediaContent = new
InputStreamContent(mimeType, new BufferedInputStream(rawIn));
+ mediaContent.setLength(size);
+
+ uploadedFileFuture = uploadExecutor.submit(() -> {
+ final DriveRequest<File> driveRequest =
createDriveRequest(fileMetadata, mediaContent);
+
+ if (size > chunkUploadThreshold) {
+ return uploadFileInChunks(driveRequest, fileMetadata,
uploadChunkSize, mediaContent);
+ } else {
+ return driveRequest.execute();
+ }
+ });
+ uploadedFile = uploadedFileFuture.get();
+
+ } finally {
+ uploadExecutor.shutdown();
+ }
+
+ if (uploadedFile != null) {
+ final Map<String, String> attributes =
createAttributeMap(uploadedFile);
+ final String url = DRIVE_URL + uploadedFile.getId();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url,
transferMillis);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (GoogleJsonResponseException e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+ handleExpectedError(session, flowFile, e);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+
+ if (e.getCause() != null && e.getCause() instanceof
GoogleJsonResponseException) {
+ handleExpectedError(session, flowFile,
(GoogleJsonResponseException) e.getCause());
+ } else {
+ handleUnexpectedError(session, flowFile, e);
+ }
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
+
+ final HttpTransport httpTransport = new
ProxyAwareTransportFactory(proxyConfiguration).create();
+
+ driveService = createDriveService(context, httpTransport,
DriveScopes.DRIVE, DriveScopes.DRIVE_METADATA);
+ }
+
+ @OnUnscheduled
+ public void shutdown() {
+ if (uploadedFileFuture != null) {
+ uploadedFileFuture.cancel(true);
+ }
+ }
+
+ private FlowFile addAttributes(File file, FlowFile flowFile,
ProcessSession session) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ID, file.getId());
+ attributes.put(FILENAME, file.getName());
+ return session.putAllAttributes(flowFile, attributes);
+ }
+
+ private String getFolder(String folderId, String folderName) {
+ return folderId == null ? folderName : folderId;
+ }
+
+ private DriveRequest<File> createDriveRequest(File fileMetadata, final
InputStreamContent mediaContent) throws IOException {
+ if (fileMetadata.getId() == null) {
+ return driveService.files()
+ .create(fileMetadata, mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ } else {
+ return driveService.files()
+ .update(fileMetadata.getId(), new File(), mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ }
+ }
+
+ private File uploadFileInChunks(DriveRequest<File> driveRequest, File
fileMetadata, final int chunkSize, final InputStreamContent mediaContent)
throws IOException {
+ final HttpResponse response = driveRequest
+ .getMediaHttpUploader()
+ .setChunkSize(chunkSize)
+ .setDirectUploadEnabled(false)
+ .upload(new GenericUrl(MULTIPART_UPLOAD_URL));
+
+ if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) {
+ fileMetadata.setId(getUploadedFileId(response.getContent()));
+ fileMetadata.setMimeType(mediaContent.getType());
+ fileMetadata.setCreatedTime(new
DateTime(System.currentTimeMillis()));
+ fileMetadata.setSize(mediaContent.getLength());
+ return fileMetadata;
+ } else {
+ throw new ProcessException(format("Upload of file '%s' to folder
'%s' failed, HTTP error code: %d", fileMetadata.getName(),
fileMetadata.getId(), response.getStatusCode()));
+ }
+ }
+
+ private String getUploadedFileId(final InputStream content) {
+ final String contentAsString = new BufferedReader(new
InputStreamReader(content, UTF_8))
+ .lines()
+ .collect(joining("\n"));
+ return new JSONObject(contentAsString).getString("id");
+ }
+
+ private File handleFolderNames(String folderName, String parentFolderId,
boolean createFolder) throws IOException {
+ int index = folderName.indexOf("/");
+
+ if (index > 0 && index < folderName.length() - 1) {
+ String mainFolderName = folderName.substring(0, index);
Review Comment:
I think these can be final. Line 397 as well.
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+ private Future<File> uploadedFileFuture;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = null;
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType =
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ final String folderName =
context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ folderId = createFoldersAndGetParentId(context, flowFile);
+ final long startNanos = System.nanoTime();
+ final ExecutorService uploadExecutor =
Executors.newSingleThreadExecutor();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize =
context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ Optional<File> alreadyExistingFile = checkFileExistence(filename,
folderId);
Review Comment:
While it is a clean solution, it is also not that performant because, before
every upload, there is another request to check whether the file exists. I am
also afraid that the search behind the check might be slow. (Not sure about
that.) In other cases (Azure, for example), we handle conflicts by catching the
exceptions thrown by the upload part of the code. What do you think? Does it
make sense here?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/PutGoogleDriveTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.singletonList;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveTrait.DRIVE_FOLDER_MIME_TYPE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import com.google.gson.JsonParseException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class PutGoogleDriveTest extends AbstractGoogleDriveTest{
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ final PutGoogleDrive testSubject = new PutGoogleDrive() {
+ @Override
+ public Drive createDriveService(ProcessContext context,
HttpTransport httpTransport, String... scopes) {
+ return mockDriverService;
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(testSubject);
+ super.setUp();
+ }
+
+ @Test
+ void testFileUploadFileNameFromProperty() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FILE_NAME, FILENAME);
+ testRunner.setProperty(PutGoogleDrive.FOLDER_ID, FOLDER_ID);
+ testRunner.setProperty(PutGoogleDrive.BASE_FOLDER_ID, BASE_FOLDER_ID);
+
+ mockFileUpload(createFile());
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutGoogleDrive.REL_SUCCESS,
1);
+ assertFlowFileAttributes(PutGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadFileNameFromFlowFileAttribute() throws Exception {
+ testRunner.setProperty(PutGoogleDrive.FOLDER_ID, FOLDER_ID);
+ testRunner.setProperty(PutGoogleDrive.BASE_FOLDER_ID, BASE_FOLDER_ID);
+
+ mockFileUpload(createFile());
+
+ final MockFlowFile mockFlowFile = getMockFlowFile();
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", FILENAME);
Review Comment:
It is a pretty common attribute. Don't we have a const for that?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
Review Comment:
Minor again, but it would be good to have a naming convention here. Most of
the time I see `.` or `-` but it would be good if we would have a rule for that.
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
Review Comment:
Is this comment intended to be here?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+ private Future<File> uploadedFileFuture;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = null;
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType =
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ final String folderName =
context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ folderId = createFoldersAndGetParentId(context, flowFile);
+ final long startNanos = System.nanoTime();
+ final ExecutorService uploadExecutor =
Executors.newSingleThreadExecutor();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize =
context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ Optional<File> alreadyExistingFile = checkFileExistence(filename,
folderId);
+ final File fileMetadata = alreadyExistingFile.isPresent() ?
alreadyExistingFile.get() : createMetadata(filename, folderId);
+
+ if (alreadyExistingFile.isPresent() &&
FAIL_RESOLUTION.equals(conflictResolution)) {
+ getLogger().error("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, FAIL_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (alreadyExistingFile.isPresent() &&
IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, IGNORE_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final File uploadedFile;
+
+ try (final InputStream rawIn = session.read(flowFile)) {
+ final InputStreamContent mediaContent = new
InputStreamContent(mimeType, new BufferedInputStream(rawIn));
Review Comment:
I think we need to close the BufferedInputStream as well. Also, there is
another overload of read methods with an `inputStreamCallback`. There is a
50-50 chance of using one, but it adds a lot of logic on top of reading data
from a FlowFile. Do we know which one is recommended to use and why?
@turcsanyip maybe?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
Review Comment:
Minor, this appears to be in a new line.
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.services.drive.Drive;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class FetchGoogleDriveTest extends AbstractGoogleDriveTest {
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ final FetchGoogleDrive testSubject = new FetchGoogleDrive() {
+ @Override
+ public Drive createDriveService(ProcessContext context,
HttpTransport httpTransport, String... scopes) {
+ return mockDriverService;
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(testSubject);
+ super.setUp();
+ }
+
+ @Test
+ void testFileFetchFileNameFromProperty() throws IOException {
+ testRunner.setProperty(FetchGoogleDrive.FILE_ID, FILE_ID);
+
+ mockFileDownload(FILE_ID);
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(FetchGoogleDrive.REL_SUCCESS,
1);
+ assertFlowFileAttributes(FetchGoogleDrive.REL_SUCCESS);
+ assertProvenanceEvent(ProvenanceEventType.FETCH);
+ }
+
+ @Test
+ void testFetchFileNameFromFlowFileAttribute() throws Exception {
+ final MockFlowFile mockFlowFile = new MockFlowFile(0);
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("drive.id", FILE_ID);
Review Comment:
GoogleDriveAttributes.ID can be referenced here.
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+ private Future<File> uploadedFileFuture;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = null;
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType =
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ final String folderName =
context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ folderId = createFoldersAndGetParentId(context, flowFile);
+ final long startNanos = System.nanoTime();
+ final ExecutorService uploadExecutor =
Executors.newSingleThreadExecutor();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize =
context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ Optional<File> alreadyExistingFile = checkFileExistence(filename,
folderId);
+ final File fileMetadata = alreadyExistingFile.isPresent() ?
alreadyExistingFile.get() : createMetadata(filename, folderId);
+
+ if (alreadyExistingFile.isPresent() &&
FAIL_RESOLUTION.equals(conflictResolution)) {
+ getLogger().error("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, FAIL_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (alreadyExistingFile.isPresent() &&
IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, IGNORE_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final File uploadedFile;
+
+ try (final InputStream rawIn = session.read(flowFile)) {
+ final InputStreamContent mediaContent = new
InputStreamContent(mimeType, new BufferedInputStream(rawIn));
+ mediaContent.setLength(size);
+
+ uploadedFileFuture = uploadExecutor.submit(() -> {
+ final DriveRequest<File> driveRequest =
createDriveRequest(fileMetadata, mediaContent);
+
+ if (size > chunkUploadThreshold) {
+ return uploadFileInChunks(driveRequest, fileMetadata,
uploadChunkSize, mediaContent);
+ } else {
+ return driveRequest.execute();
+ }
+ });
+ uploadedFile = uploadedFileFuture.get();
+
+ } finally {
+ uploadExecutor.shutdown();
+ }
+
+ if (uploadedFile != null) {
+ final Map<String, String> attributes =
createAttributeMap(uploadedFile);
+ final String url = DRIVE_URL + uploadedFile.getId();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url,
transferMillis);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (GoogleJsonResponseException e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+ handleExpectedError(session, flowFile, e);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+
+ if (e.getCause() != null && e.getCause() instanceof
GoogleJsonResponseException) {
+ handleExpectedError(session, flowFile,
(GoogleJsonResponseException) e.getCause());
+ } else {
+ handleUnexpectedError(session, flowFile, e);
+ }
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
+
+ final HttpTransport httpTransport = new
ProxyAwareTransportFactory(proxyConfiguration).create();
+
+ driveService = createDriveService(context, httpTransport,
DriveScopes.DRIVE, DriveScopes.DRIVE_METADATA);
+ }
+
+ @OnUnscheduled
+ public void shutdown() {
+ if (uploadedFileFuture != null) {
+ uploadedFileFuture.cancel(true);
+ }
+ }
+
+ private FlowFile addAttributes(File file, FlowFile flowFile,
ProcessSession session) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ID, file.getId());
+ attributes.put(FILENAME, file.getName());
+ return session.putAllAttributes(flowFile, attributes);
+ }
+
+ private String getFolder(String folderId, String folderName) {
+ return folderId == null ? folderName : folderId;
+ }
+
+ private DriveRequest<File> createDriveRequest(File fileMetadata, final
InputStreamContent mediaContent) throws IOException {
+ if (fileMetadata.getId() == null) {
+ return driveService.files()
+ .create(fileMetadata, mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ } else {
+ return driveService.files()
+ .update(fileMetadata.getId(), new File(), mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ }
+ }
+
+ private File uploadFileInChunks(DriveRequest<File> driveRequest, File
fileMetadata, final int chunkSize, final InputStreamContent mediaContent)
throws IOException {
+ final HttpResponse response = driveRequest
+ .getMediaHttpUploader()
+ .setChunkSize(chunkSize)
+ .setDirectUploadEnabled(false)
+ .upload(new GenericUrl(MULTIPART_UPLOAD_URL));
+
+ if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) {
+ fileMetadata.setId(getUploadedFileId(response.getContent()));
+ fileMetadata.setMimeType(mediaContent.getType());
+ fileMetadata.setCreatedTime(new
DateTime(System.currentTimeMillis()));
+ fileMetadata.setSize(mediaContent.getLength());
+ return fileMetadata;
+ } else {
+ throw new ProcessException(format("Upload of file '%s' to folder
'%s' failed, HTTP error code: %d", fileMetadata.getName(),
fileMetadata.getId(), response.getStatusCode()));
+ }
+ }
+
+ private String getUploadedFileId(final InputStream content) {
+ final String contentAsString = new BufferedReader(new
InputStreamReader(content, UTF_8))
+ .lines()
+ .collect(joining("\n"));
+ return new JSONObject(contentAsString).getString("id");
+ }
+
+ private File handleFolderNames(String folderName, String parentFolderId,
boolean createFolder) throws IOException {
Review Comment:
Can we give a more descriptive name of what this method is doing?
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.drive;
+
+/*
+ * This processor uploads objects to Google Drive.
+ */
+
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.joining;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC;
+import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.InputStreamContent;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.drive.Drive;
+import com.google.api.services.drive.DriveRequest;
+import com.google.api.services.drive.DriveScopes;
+import com.google.api.services.drive.model.File;
+import com.google.api.services.drive.model.FileList;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.json.JSONObject;
+
+@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "drive", "storage", "put"})
+@CapabilityDescription("Puts content to a Google Drive Folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Google Drive object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC),
+ @WritesAttribute(attribute = ERROR_CODE, description =
ERROR_CODE_DESC),
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC)})
+public class PutGoogleDrive extends AbstractProcessor implements
GoogleDriveTrait {
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final PropertyDescriptor FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("folder-id")
+ .displayName("Folder ID")
+ .description("The ID of the folder to upload files to. In case
neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the
folder " +
+ " defined by 'Base Folder ID'." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor FOLDER_NAME = new
PropertyDescriptor.Builder()
+ .name("folder-name")
+ .displayName("Folder Name")
+ .description("The name (path) of the folder to upload files to. In
case 'Folder ID' is also defined, the ID will be used to identify the folder.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_FOLDER_ID = new
PropertyDescriptor.Builder()
+ .name("base-folder-id")
+ .displayName("Base Folder ID")
+ .description("The ID of the shared folder. In case neither 'Folder
ID' nor 'Folder Name' is specified, file is uploaded to this folder." +
+ " For how to setup access to Google Drive and obtain
Folder ID please see additional details.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor
+ .Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The name of the file to upload to the specified
Google Drive folder.")
+ .required(true)
+ .defaultValue("${filename}")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CREATE_FOLDER = new
PropertyDescriptor.Builder()
+ .name("create-folder")
+ .displayName("Create Folder")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to check if the folder exists and
to automatically create it if it does not. " +
+ "Permission to list folders is required. ")
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Google Drive folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold'.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("10 MB")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks.")
+ .defaultValue("100 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(asList(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ BASE_FOLDER_ID,
+ FOLDER_ID,
+ FOLDER_NAME,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CREATE_FOLDER,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxyAwareTransportFactory.PROXY_SPECS)
+ ));
+
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to
Google Drive are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Google
Drive for some reason are transferred to this relationship.")
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+
+ public static final String MULTIPART_UPLOAD_URL =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ private volatile Drive driveService;
+ private Future<File> uploadedFileFuture;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ String folderId = null;
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String mimeType =
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ final String folderName =
context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ folderId = createFoldersAndGetParentId(context, flowFile);
+ final long startNanos = System.nanoTime();
+ final ExecutorService uploadExecutor =
Executors.newSingleThreadExecutor();
+ final long size = flowFile.getSize();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final int uploadChunkSize =
context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .intValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ Optional<File> alreadyExistingFile = checkFileExistence(filename,
folderId);
+ final File fileMetadata = alreadyExistingFile.isPresent() ?
alreadyExistingFile.get() : createMetadata(filename, folderId);
+
+ if (alreadyExistingFile.isPresent() &&
FAIL_RESOLUTION.equals(conflictResolution)) {
+ getLogger().error("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, FAIL_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (alreadyExistingFile.isPresent() &&
IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File '{}' already exists in folder '{}',
conflict resolution is '{}' ", filename, folderId, IGNORE_RESOLUTION);
+ flowFile = addAttributes(alreadyExistingFile.get(), flowFile,
session);
+ session.transfer(flowFile, REL_SUCCESS);
+ return;
+ }
+
+ final File uploadedFile;
+
+ try (final InputStream rawIn = session.read(flowFile)) {
+ final InputStreamContent mediaContent = new
InputStreamContent(mimeType, new BufferedInputStream(rawIn));
+ mediaContent.setLength(size);
+
+ uploadedFileFuture = uploadExecutor.submit(() -> {
+ final DriveRequest<File> driveRequest =
createDriveRequest(fileMetadata, mediaContent);
+
+ if (size > chunkUploadThreshold) {
+ return uploadFileInChunks(driveRequest, fileMetadata,
uploadChunkSize, mediaContent);
+ } else {
+ return driveRequest.execute();
+ }
+ });
+ uploadedFile = uploadedFileFuture.get();
+
+ } finally {
+ uploadExecutor.shutdown();
+ }
+
+ if (uploadedFile != null) {
+ final Map<String, String> attributes =
createAttributeMap(uploadedFile);
+ final String url = DRIVE_URL + uploadedFile.getId();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url,
transferMillis);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (GoogleJsonResponseException e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+ handleExpectedError(session, flowFile, e);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Google Drive folder '{}'", filename,
+ getFolder(folderId, folderName), e);
+
+ if (e.getCause() != null && e.getCause() instanceof
GoogleJsonResponseException) {
+ handleExpectedError(session, flowFile,
(GoogleJsonResponseException) e.getCause());
+ } else {
+ handleUnexpectedError(session, flowFile, e);
+ }
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
+
+ final HttpTransport httpTransport = new
ProxyAwareTransportFactory(proxyConfiguration).create();
+
+ driveService = createDriveService(context, httpTransport,
DriveScopes.DRIVE, DriveScopes.DRIVE_METADATA);
+ }
+
+ @OnUnscheduled
+ public void shutdown() {
+ if (uploadedFileFuture != null) {
+ uploadedFileFuture.cancel(true);
+ }
+ }
+
+ private FlowFile addAttributes(File file, FlowFile flowFile,
ProcessSession session) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ID, file.getId());
+ attributes.put(FILENAME, file.getName());
+ return session.putAllAttributes(flowFile, attributes);
+ }
+
+ private String getFolder(String folderId, String folderName) {
+ return folderId == null ? folderName : folderId;
+ }
+
+ private DriveRequest<File> createDriveRequest(File fileMetadata, final
InputStreamContent mediaContent) throws IOException {
+ if (fileMetadata.getId() == null) {
+ return driveService.files()
+ .create(fileMetadata, mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ } else {
+ return driveService.files()
+ .update(fileMetadata.getId(), new File(), mediaContent)
+ .setFields("id, name, createdTime, mimeType, size");
+ }
+ }
+
+ private File uploadFileInChunks(DriveRequest<File> driveRequest, File
fileMetadata, final int chunkSize, final InputStreamContent mediaContent)
throws IOException {
+ final HttpResponse response = driveRequest
+ .getMediaHttpUploader()
+ .setChunkSize(chunkSize)
+ .setDirectUploadEnabled(false)
+ .upload(new GenericUrl(MULTIPART_UPLOAD_URL));
+
+ if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) {
+ fileMetadata.setId(getUploadedFileId(response.getContent()));
+ fileMetadata.setMimeType(mediaContent.getType());
+ fileMetadata.setCreatedTime(new
DateTime(System.currentTimeMillis()));
+ fileMetadata.setSize(mediaContent.getLength());
+ return fileMetadata;
+ } else {
+ throw new ProcessException(format("Upload of file '%s' to folder
'%s' failed, HTTP error code: %d", fileMetadata.getName(),
fileMetadata.getId(), response.getStatusCode()));
+ }
+ }
+
+ private String getUploadedFileId(final InputStream content) {
+ final String contentAsString = new BufferedReader(new
InputStreamReader(content, UTF_8))
+ .lines()
+ .collect(joining("\n"));
+ return new JSONObject(contentAsString).getString("id");
+ }
+
+ private File handleFolderNames(String folderName, String parentFolderId,
boolean createFolder) throws IOException {
+ int index = folderName.indexOf("/");
+
+ if (index > 0 && index < folderName.length() - 1) {
+ String mainFolderName = folderName.substring(0, index);
+ String subFolders = folderName.substring(index + 1);
+ final File mainFolder = getOrCreateFolder(mainFolderName,
parentFolderId, createFolder);
+ return handleFolderNames(subFolders, mainFolder.getId(),
createFolder);
+ } else {
+ return getOrCreateFolder(folderName, parentFolderId, createFolder);
+ }
+ }
+
+ private File getOrCreateFolder(String folderName, String parentFolderId,
boolean createFolder) throws IOException {
+ final Optional<File> existingFolder = checkFolderExistence(folderName,
parentFolderId);
+
+ if (existingFolder.isPresent()) {
+ return existingFolder.get();
+ }
+
+ if (createFolder) {
+ getLogger().debug("Create folder " + folderName + " parent id: " +
parentFolderId);
+ final File folderMetadata = createMetadata(folderName,
parentFolderId);
+ folderMetadata.setMimeType(DRIVE_FOLDER_MIME_TYPE);
+
+ return driveService.files()
+ .create(folderMetadata)
+ .setFields("id, parents")
+ .execute();
+ } else {
+ throw new ProcessException(format("The specified (sub)folder '%s'
does not exists and '%s' is false.", folderName,
CREATE_FOLDER.getDisplayName()));
+ }
+ }
+
+ private File createMetadata(final String name, final String parentId) {
+ final File metadata = new File();
+ metadata.setName(name);
+ metadata.setParents(singletonList(parentId));
+ return metadata;
+ }
+
+ private String createFoldersAndGetParentId(final ProcessContext context,
FlowFile flowFile) throws IOException {
+ String folderId =
context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
Review Comment:
Missing finals.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]