krisztina-zsihovszki commented on code in PR #6832: URL: https://github.com/apache/nifi/pull/6832#discussion_r1073749574
########## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java: ########## @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +/* + * This processor uploads objects to Google Drive. + */ + +import static java.lang.String.format; +import static java.lang.String.valueOf; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.joining; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC; +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.InputStreamContent; +import com.google.api.client.util.DateTime; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.DriveRequest; +import com.google.api.services.drive.DriveScopes; +import com.google.api.services.drive.model.File; +import com.google.api.services.drive.model.FileList; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.json.JSONObject; + +@SeeAlso({ListGoogleDrive.class, FetchGoogleDrive.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"google", "drive", "storage", "put"}) +@CapabilityDescription("Puts content to a Google Drive Folder.") +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Google Drive object.") +@WritesAttributes({ + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = MIME_TYPE, description = MIME_TYPE_DESC), + @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC), + @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)}) +public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { + + public static final String IGNORE_RESOLUTION = "ignore"; + public static final String OVERWRITE_RESOLUTION = "overwrite"; + public static final String FAIL_RESOLUTION = "fail"; + + public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder() + .name("folder-id") + .displayName("Folder ID") + .description("The ID of the folder to upload files to. In case neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to the folder " + + " defined by 'Base Folder ID'." + + " For how to setup access to Google Drive and obtain Folder ID please see additional details.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor FOLDER_NAME = new PropertyDescriptor.Builder() + .name("folder-name") + .displayName("Folder Name") + .description("The name (path) of the folder to upload files to. In case 'Folder ID' is also defined, the ID will be used to identify the folder.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + public static final PropertyDescriptor BASE_FOLDER_ID = new PropertyDescriptor.Builder() + .name("base-folder-id") + .displayName("Base Folder ID") + .description("The ID of the shared folder. In case neither 'Folder ID' nor 'Folder Name' is specified, file is uploaded to this folder." + + " For how to setup access to Google Drive and obtain Folder ID please see additional details.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor + .Builder() + .name("file-name") + .displayName("Filename") + .description("The name of the file to upload to the specified Google Drive folder.") + .required(true) + .defaultValue("${filename}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CREATE_FOLDER = new PropertyDescriptor.Builder() + .name("create-folder") + .displayName("Create Folder") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .description("Specifies whether to check if the folder exists and to automatically create it if it does not. " + + "Permission to list folders is required. ") + .build(); + + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("conflict-resolution-strategy") + .displayName("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the specified Google Drive folder.") + .required(true) + .defaultValue(FAIL_RESOLUTION) + .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, OVERWRITE_RESOLUTION) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder() + .name("chunked-upload-size") + .displayName("Chunked Upload Size") + .description("Defines the size of a chunk. Used when a FlowFile's size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller chunks. " + + "It is recommended to specify chunked upload size smaller than 'Chunked Upload Threshold'.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("10 MB") + .required(false) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new PropertyDescriptor.Builder() + .name("chunked-upload-threshold") + .displayName("Chunked Upload Threshold") + .description("The maximum size of the content which is uploaded at once. FlowFiles larger than this threshold are uploaded in chunks.") + .defaultValue("100 MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(false) + .build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(asList( + GCP_CREDENTIALS_PROVIDER_SERVICE, + BASE_FOLDER_ID, + FOLDER_ID, + FOLDER_NAME, + FILE_NAME, + CONFLICT_RESOLUTION, + CREATE_FOLDER, + CHUNKED_UPLOAD_THRESHOLD, + CHUNKED_UPLOAD_SIZE, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) + )); + + public static final Relationship REL_SUCCESS = + new Relationship.Builder() + .name("success") + .description("Files that have been successfully written to Google Drive are transferred to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = + new Relationship.Builder() + .name("failure") + .description("Files that could not be written to Google Drive for some reason are transferred to this relationship.") + .build(); + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(asList( + REL_SUCCESS, + REL_FAILURE + ))); + + public static final String MULTIPART_UPLOAD_URL = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"; + + private volatile Drive driveService; + private Future<File> uploadedFileFuture; + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + String folderId = null; + final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + final String folderName = context.getProperty(FOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + try { + folderId = createFoldersAndGetParentId(context, flowFile); + final long startNanos = System.nanoTime(); + final ExecutorService uploadExecutor = Executors.newSingleThreadExecutor(); + final long size = flowFile.getSize(); + + final long chunkUploadThreshold = context.getProperty(CHUNKED_UPLOAD_THRESHOLD) + .asDataSize(DataUnit.B) + .longValue(); + + final int uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE) + .asDataSize(DataUnit.B) + .intValue(); + + final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + Optional<File> alreadyExistingFile = checkFileExistence(filename, folderId); Review Comment: You are right, that is the useful approach but it does not work in Google Drive case. In Google Drive you can create several folders, files with the same name. It does not throw error if you create az object with an already existing name, it happily creates another object with the same name (the id will be different, of course). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
