turcsanyip commented on code in PR #6740:
URL: https://github.com/apache/nifi/pull/6740#discussion_r1039997166


##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.FileMetadata;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor UPLOAD_CHUNK_SIZE = new 
PropertyDescriptor.Builder()
+            .name("upload-chunk-size")
+            .displayName("Upload Chunk Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds CHUNK_UPLOAD_LIMIT and content is uploaded in smaller chunks. "
+                    + "It is recommended to specify chunk size as multiples of 
4 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNK_UPLOAD_LIMIT = new 
PropertyDescriptor.Builder()
+            .name("chunk-upload-limit")
+            .displayName("Chunk Upload Limit")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this limit are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            UPLOAD_CHUNK_SIZE,
+            CHUNK_UPLOAD_LIMIT,

Review Comment:
   Thanks for adding a property for the limit. I'd suggest the following names 
and order of these two chunked upload related properties:
   ```
   Chunked Upload Threshold
   Chunked Upload Size
   ```



##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java:
##########
@@ -51,7 +49,7 @@
 @CapabilityDescription("Fetches files from Dropbox. Designed to be used in 
tandem with ListDropbox.")
 @WritesAttribute(attribute = "error.message", description = "When a FlowFile 
is routed to 'failure', this attribute is added indicating why the file could "
         + "not be fetched from Dropbox.")
-@SeeAlso(ListDropbox.class)
+@SeeAlso({PutDropbox.class, ListDropbox.class})
 @WritesAttributes(
         @WritesAttribute(attribute = FetchDropbox.ERROR_MESSAGE_ATTRIBUTE, 
description = "The error message returned by Dropbox when the fetch of a file 
fails."))

Review Comment:
   Just spotted that `@WritesAttribute` is duplicated with the same attribute 
name (`error.message`).



##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.FileMetadata;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor UPLOAD_CHUNK_SIZE = new 
PropertyDescriptor.Builder()
+            .name("upload-chunk-size")
+            .displayName("Upload Chunk Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds CHUNK_UPLOAD_LIMIT and content is uploaded in smaller chunks. "
+                    + "It is recommended to specify chunk size as multiples of 
4 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNK_UPLOAD_LIMIT = new 
PropertyDescriptor.Builder()
+            .name("chunk-upload-limit")
+            .displayName("Chunk Upload Limit")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this limit are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            UPLOAD_CHUNK_SIZE,
+            CHUNK_UPLOAD_LIMIT,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxySpec.HTTP_AUTH)
+    ));
+
+
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    private DbxClientV2 dropboxApiClient;
+
+    private DbxUploader dbxUploader;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String folder = 
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+        final String filename = 
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final long uploadFileSizeLimit = 
context.getProperty(CHUNK_UPLOAD_LIMIT)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        FileMetadata uploadedFileMetadata = null;
+
+        long size = flowFile.getSize();
+        final String uploadPath = convertFolderName(folder) + "/" + filename;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            if (size <= uploadFileSizeLimit) {
+                uploadedFileMetadata = 
createUploadUploader(uploadPath).uploadAndFinish(rawIn);
+            } else {
+                long chunkSize = context.getProperty(UPLOAD_CHUNK_SIZE)
+                        .asDataSize(DataUnit.B)
+                        .longValue();
+
+                uploadedFileMetadata = uploadLargeFileInChunks(rawIn, size, 
chunkSize, uploadPath);
+            }
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while uploading file '{}' to 
Dropbox folder '{}'", filename, folder, e);
+        } finally {
+            dbxUploader.close();

Review Comment:
   It is not enough to close `dbxUploader` here because it is reassigned a 
couple of times in the called methods (like `uploadLargeFileInChunks`). 
Instead, the local `DbxUploader` objects need to be closed individually (and 
this global close is not needed).
   The best option is to use TWR blocks, like this:
   ```
           final String sessionId;
           try (UploadSessionStartUploader uploader = 
createUploadSessionStartUploader()) {
               sessionId = uploader.uploadAndFinish(rawIn, 
uploadChunkSize).getSessionId();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to