turcsanyip commented on code in PR #6740:
URL: https://github.com/apache/nifi/pull/6740#discussion_r1048458190


##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static java.lang.String.format;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.RateLimitException;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String OVERWRITE_RESOLUTION = "overwrite";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the 
same name already exists in the specified Dropbox folder.")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, 
OVERWRITE_RESOLUTION)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-size")
+            .displayName("Chunked Upload Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller 
chunks. "
+                    + "It is recommended to specify chunked upload size 
smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-threshold")
+            .displayName("Chunked Upload Threshold")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this threshold are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            CONFLICT_RESOLUTION,
+            CHUNKED_UPLOAD_THRESHOLD,
+            CHUNKED_UPLOAD_SIZE,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxySpec.HTTP_AUTH)
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    private DbxClientV2 dropboxApiClient;
+
+    private DbxUploader<?, ?, ?> dbxUploader;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String folder = 
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+        final String filename = 
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final long chunkUploadThreshold = 
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+        boolean uploadErrorOccurred = false;
+
+        final long size = flowFile.getSize();
+        final String uploadPath = convertFolderName(folder) + "/" + filename;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            try {
+                if (size <= chunkUploadThreshold) {
+                    try (UploadUploader uploader = 
createUploadUploader(uploadPath, conflictResolution)) {
+                        uploader.uploadAndFinish(rawIn);
+                    }
+                } else {
+                    uploadLargeFileInChunks(uploadPath, rawIn, size, 
uploadChunkSize, conflictResolution);
+                }
+            } catch (UploadErrorException e) {
+                handleUploadError(conflictResolution, uploadPath, e);
+            } catch (RateLimitException e) {
+                context.yield();
+                throw new ProcessException("Dropbox API rate limit exceeded 
while uploading file", e);
+            }
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while uploading file '{}' to 
Dropbox folder '{}'", filename, folder, e);
+            uploadErrorOccurred = true;
+        } finally {
+            dbxUploader.close();
+        }
+
+        if (!uploadErrorOccurred) {
+            session.transfer(flowFile, REL_SUCCESS);
+        } else {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    @OnUnscheduled
+    public void shutdown() {
+        if (dbxUploader != null) {
+            dbxUploader.close();
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    private void handleUploadError(final String conflictResolution, final 
String uploadPath, final UploadErrorException e) throws UploadErrorException {
+        if (e.errorValue.isPath() && 
e.errorValue.getPathValue().getReason().isConflict()) {
+
+            if (IGNORE_RESOLUTION.equals(conflictResolution)) {
+                getLogger().info("File with the same name [{}] already exists. 
Remote file is not modified due to {} being set to '{}'.",
+                        uploadPath, CONFLICT_RESOLUTION.getDisplayName(), 
conflictResolution);
+                return;
+            } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+                throw new ProcessException(format("File with the same name 
[%s] already exists.", uploadPath), e);
+            }
+        }
+        throw new ProcessException(e);
+    }
+
+    private void uploadLargeFileInChunks(String path, InputStream rawIn, long 
size, long uploadChunkSize,  String conflictResolution) throws Exception {

Review Comment:
   Minor:
   ```suggestion
       private void uploadLargeFileInChunks(String path, InputStream rawIn, 
long size, long uploadChunkSize,  String conflictResolution) throws 
DbxException, IOException {
   ```



##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static java.lang.String.format;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.RateLimitException;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String OVERWRITE_RESOLUTION = "overwrite";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the 
same name already exists in the specified Dropbox folder.")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, 
OVERWRITE_RESOLUTION)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-size")
+            .displayName("Chunked Upload Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller 
chunks. "
+                    + "It is recommended to specify chunked upload size 
smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-threshold")
+            .displayName("Chunked Upload Threshold")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this threshold are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            CONFLICT_RESOLUTION,
+            CHUNKED_UPLOAD_THRESHOLD,
+            CHUNKED_UPLOAD_SIZE,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxySpec.HTTP_AUTH)
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    private DbxClientV2 dropboxApiClient;
+
+    private DbxUploader<?, ?, ?> dbxUploader;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String folder = 
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+        final String filename = 
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final long chunkUploadThreshold = 
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+        boolean uploadErrorOccurred = false;
+
+        final long size = flowFile.getSize();
+        final String uploadPath = convertFolderName(folder) + "/" + filename;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            try {
+                if (size <= chunkUploadThreshold) {
+                    try (UploadUploader uploader = 
createUploadUploader(uploadPath, conflictResolution)) {
+                        uploader.uploadAndFinish(rawIn);
+                    }
+                } else {
+                    uploadLargeFileInChunks(uploadPath, rawIn, size, 
uploadChunkSize, conflictResolution);
+                }
+            } catch (UploadErrorException e) {
+                handleUploadError(conflictResolution, uploadPath, e);
+            } catch (RateLimitException e) {
+                context.yield();
+                throw new ProcessException("Dropbox API rate limit exceeded 
while uploading file", e);
+            }
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while uploading file '{}' to 
Dropbox folder '{}'", filename, folder, e);
+            uploadErrorOccurred = true;
+        } finally {
+            dbxUploader.close();
+        }
+
+        if (!uploadErrorOccurred) {
+            session.transfer(flowFile, REL_SUCCESS);
+        } else {
+            session.transfer(flowFile, REL_FAILURE);
+        }

Review Comment:
   I would be a clearer code design without the `uploadErrorOccurred` flag. The 
success case can be moved into the `try` block and the failure into `catch`.
   
   Please also send a provenance event for success and penalize the FlowFile in 
case of failure.
   Example can be found in 
[PutAzureDataLakeStorage](https://github.com/apache/nifi/blob/ed6ba537249ea781402dc0ac7e341907ba5b0c94/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java#L144-L157).
   Please note, provenance event is only needed when the upload has really 
happened (so not when the file exists and conflict strategy is ignore).



##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static java.lang.String.format;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.RateLimitException;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String OVERWRITE_RESOLUTION = "overwrite";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the 
same name already exists in the specified Dropbox folder.")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, 
OVERWRITE_RESOLUTION)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-size")
+            .displayName("Chunked Upload Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller 
chunks. "
+                    + "It is recommended to specify chunked upload size 
smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-threshold")
+            .displayName("Chunked Upload Threshold")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this threshold are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            CONFLICT_RESOLUTION,
+            CHUNKED_UPLOAD_THRESHOLD,
+            CHUNKED_UPLOAD_SIZE,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxySpec.HTTP_AUTH)
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    private DbxClientV2 dropboxApiClient;
+
+    private DbxUploader<?, ?, ?> dbxUploader;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String folder = 
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+        final String filename = 
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final long chunkUploadThreshold = 
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+        boolean uploadErrorOccurred = false;
+
+        final long size = flowFile.getSize();
+        final String uploadPath = convertFolderName(folder) + "/" + filename;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            try {
+                if (size <= chunkUploadThreshold) {
+                    try (UploadUploader uploader = 
createUploadUploader(uploadPath, conflictResolution)) {
+                        uploader.uploadAndFinish(rawIn);
+                    }
+                } else {
+                    uploadLargeFileInChunks(uploadPath, rawIn, size, 
uploadChunkSize, conflictResolution);
+                }
+            } catch (UploadErrorException e) {
+                handleUploadError(conflictResolution, uploadPath, e);
+            } catch (RateLimitException e) {
+                context.yield();
+                throw new ProcessException("Dropbox API rate limit exceeded 
while uploading file", e);
+            }
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while uploading file '{}' to 
Dropbox folder '{}'", filename, folder, e);
+            uploadErrorOccurred = true;
+        } finally {
+            dbxUploader.close();

Review Comment:
   `dbxUploader` can be `null` at this point (eg. when connect error occurs and 
no `DbxUploader` initialized yet) which would lead to an additional NPE beyond 
the original exception. All `DbxUploader`s are in TWR blocks now so this 
`close()` call is not necessary anymore and can simply be removed.



##########
nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static java.lang.String.format;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.RateLimitException;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the Dropbox object.")
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+    public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String OVERWRITE_RESOLUTION = "overwrite";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Files that have been successfully written to Dropbox 
are transferred to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Files that could not be written to Dropbox for some 
reason are transferred to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("The path of the Dropbox folder to upload files to. "
+                    + "The folder will be created if it does not exist yet.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+            .defaultValue("/")
+            .build();
+
+    public static final PropertyDescriptor FILE_NAME = new 
PropertyDescriptor.Builder()
+            .name("file-name")
+            .displayName("Filename")
+            .description("The full name of the file to upload.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("${filename}")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the 
same name already exists in the specified Dropbox folder.")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, 
OVERWRITE_RESOLUTION)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-size")
+            .displayName("Chunked Upload Size")
+            .description("Defines the size of a chunk. Used when a FlowFile's 
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller 
chunks. "
+                    + "It is recommended to specify chunked upload size 
smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("8 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new 
PropertyDescriptor.Builder()
+            .name("chunked-upload-threshold")
+            .displayName("Chunked Upload Threshold")
+            .description("The maximum size of the content which is uploaded at 
once. FlowFiles larger than this threshold are uploaded in chunks. "
+                    + "Maximum allowed value is 150 MB.")
+            .defaultValue("150 MB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(1, 
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CREDENTIAL_SERVICE,
+            FOLDER,
+            FILE_NAME,
+            CONFLICT_RESOLUTION,
+            CHUNKED_UPLOAD_THRESHOLD,
+            CHUNKED_UPLOAD_SIZE,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(false, 
ProxySpec.HTTP_AUTH)
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    private DbxClientV2 dropboxApiClient;
+
+    private DbxUploader<?, ?, ?> dbxUploader;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String folder = 
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+        final String filename = 
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final long chunkUploadThreshold = 
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+                .asDataSize(DataUnit.B)
+                .longValue();
+
+        final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+        boolean uploadErrorOccurred = false;
+
+        final long size = flowFile.getSize();
+        final String uploadPath = convertFolderName(folder) + "/" + filename;
+
+        try (final InputStream rawIn = session.read(flowFile)) {
+            try {
+                if (size <= chunkUploadThreshold) {
+                    try (UploadUploader uploader = 
createUploadUploader(uploadPath, conflictResolution)) {
+                        uploader.uploadAndFinish(rawIn);
+                    }
+                } else {
+                    uploadLargeFileInChunks(uploadPath, rawIn, size, 
uploadChunkSize, conflictResolution);
+                }
+            } catch (UploadErrorException e) {
+                handleUploadError(conflictResolution, uploadPath, e);
+            } catch (RateLimitException e) {
+                context.yield();
+                throw new ProcessException("Dropbox API rate limit exceeded 
while uploading file", e);
+            }
+        } catch (Exception e) {
+            getLogger().error("Exception occurred while uploading file '{}' to 
Dropbox folder '{}'", filename, folder, e);
+            uploadErrorOccurred = true;
+        } finally {
+            dbxUploader.close();
+        }
+
+        if (!uploadErrorOccurred) {
+            session.transfer(flowFile, REL_SUCCESS);
+        } else {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    @OnUnscheduled
+    public void shutdown() {
+        if (dbxUploader != null) {
+            dbxUploader.close();
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }

Review Comment:
   Minor: could you please move it up near to `getRelationships()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to