This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1d5a1bff08 NIFI-10868 PutDropbox processor
1d5a1bff08 is described below
commit 1d5a1bff0857542ac8ade18169cf3e4b9a5c3dd2
Author: krisztina-zsihovszki <[email protected]>
AuthorDate: Wed Nov 30 14:39:51 2022 +0100
NIFI-10868 PutDropbox processor
This closes #6740.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/processors/dropbox/DropboxAttributes.java | 40 +++
.../nifi/processors/dropbox/DropboxFileInfo.java | 14 +-
.../dropbox/DropboxFlowFileAttribute.java | 12 +-
.../nifi/processors/dropbox/DropboxTrait.java | 42 ++-
.../nifi/processors/dropbox/FetchDropbox.java | 83 +++--
.../nifi/processors/dropbox/ListDropbox.java | 48 +--
.../apache/nifi/processors/dropbox/PutDropbox.java | 358 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processors/dropbox/AbstractDropboxTest.java | 128 ++++++++
.../nifi/processors/dropbox/FetchDropboxTest.java | 88 ++---
.../nifi/processors/dropbox/ListDropboxTest.java | 97 ++----
.../nifi/processors/dropbox/PutDropboxIT.java | 153 +++++++++
.../nifi/processors/dropbox/PutDropboxTest.java | 333 +++++++++++++++++++
13 files changed, 1197 insertions(+), 200 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java
new file mode 100644
index 0000000000..29d9e4410c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.dropbox;
+
+public class DropboxAttributes {
+ public static final String ID = "dropbox.id";
+ public static final String ID_DESC = "The Dropbox identifier of the file";
+
+ public static final String PATH = "path";
+ public static final String PATH_DESC = "The folder path where the file is
located";
+
+ public static final String FILENAME = "filename";
+ public static final String FILENAME_DESC = "The name of the file";
+
+ public static final String SIZE = "dropbox.size";
+ public static final String SIZE_DESC = "The size of the file";
+
+ public static final String TIMESTAMP = "dropbox.timestamp";
+ public static final String TIMESTAMP_DESC = "The server modified time,
when the file was uploaded to Dropbox";
+
+ public static final String REVISION = "dropbox.revision";
+ public static final String REVISION_DESC = "Revision of the file";
+
+ public static final String ERROR_MESSAGE = "error.message";
+ public static final String ERROR_MESSAGE_DESC = "The error message
returned by Dropbox when the fetch of a file fails";
+}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java
index 22eda85a51..0d1dfc0018 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.dropbox;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,13 +38,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
public class DropboxFileInfo implements ListableEntity {
- public static final String ID = "dropbox.id";
- public static final String PATH = "path";
- public static final String FILENAME = "filename";
- public static final String SIZE = "dropbox.size";
- public static final String TIMESTAMP = "dropbox.timestamp";
- public static final String REVISION = "dropbox.revision";
-
private static final RecordSchema SCHEMA;
static {
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java
index 0a5898cb2a..12e7bd4d6e 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java
@@ -19,12 +19,12 @@ package org.apache.nifi.processors.dropbox;
import java.util.function.Function;
public enum DropboxFlowFileAttribute {
- ID(DropboxFileInfo.ID, DropboxFileInfo::getId),
- PATH(DropboxFileInfo.PATH, DropboxFileInfo::getPath),
- FILENAME(DropboxFileInfo.FILENAME, DropboxFileInfo::getName),
- SIZE(DropboxFileInfo.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())),
- TIMESTAMP(DropboxFileInfo.TIMESTAMP, fileInfo ->
String.valueOf(fileInfo.getTimestamp())),
- REVISION(DropboxFileInfo.REVISION, DropboxFileInfo::getRevision);
+ ID(DropboxAttributes.ID, DropboxFileInfo::getId),
+ PATH(DropboxAttributes.PATH, DropboxFileInfo::getPath),
+ FILENAME(DropboxAttributes.FILENAME, DropboxFileInfo::getName),
+ SIZE(DropboxAttributes.SIZE, fileInfo ->
String.valueOf(fileInfo.getSize())),
+ TIMESTAMP(DropboxAttributes.TIMESTAMP, fileInfo ->
String.valueOf(fileInfo.getTimestamp())),
+ REVISION(DropboxAttributes.REVISION, DropboxFileInfo::getRevision);
private final String name;
private final Function<DropboxFileInfo, String> fromFileInfo;
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java
index 6e8548787a..32d779c3bf 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java
@@ -16,12 +16,18 @@
*/
package org.apache.nifi.processors.dropbox;
+import static java.lang.String.format;
+import static java.lang.String.valueOf;
+
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.http.HttpRequestor;
import com.dropbox.core.http.OkHttp3Requestor;
import com.dropbox.core.oauth.DbxCredential;
import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.FileMetadata;
import java.net.Proxy;
+import java.util.HashMap;
+import java.util.Map;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import org.apache.nifi.components.PropertyDescriptor;
@@ -32,6 +38,8 @@ import org.apache.nifi.proxy.ProxyConfiguration;
public interface DropboxTrait {
+ String DROPBOX_HOME_URL = "https://www.dropbox.com/home";
+
PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder()
.name("dropbox-credential-service")
.displayName("Dropbox Credential Service")
@@ -41,9 +49,10 @@ public interface DropboxTrait {
.required(true)
.build();
-
- default DbxClientV2 getDropboxApiClient(ProcessContext context,
ProxyConfiguration proxyConfiguration, String clientId) {
- OkHttpClient.Builder okHttpClientBuilder =
OkHttp3Requestor.defaultOkHttpClientBuilder();
+ default DbxClientV2 getDropboxApiClient(ProcessContext context, String
identifier) {
+ final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
+ final String dropboxClientId = format("%s-%s",
getClass().getSimpleName(), identifier);
+ final OkHttpClient.Builder okHttpClientBuilder =
OkHttp3Requestor.defaultOkHttpClientBuilder();
if (!Proxy.Type.DIRECT.equals(proxyConfiguration.getProxyType())) {
okHttpClientBuilder.proxy(proxyConfiguration.createProxy());
@@ -58,16 +67,37 @@ public interface DropboxTrait {
}
}
- HttpRequestor httpRequestor = new
OkHttp3Requestor(okHttpClientBuilder.build());
- DbxRequestConfig config = DbxRequestConfig.newBuilder(clientId)
+ final HttpRequestor httpRequestor = new
OkHttp3Requestor(okHttpClientBuilder.build());
+ final DbxRequestConfig config =
DbxRequestConfig.newBuilder(dropboxClientId)
.withHttpRequestor(httpRequestor)
.build();
final DropboxCredentialService credentialService =
context.getProperty(CREDENTIAL_SERVICE)
.asControllerService(DropboxCredentialService.class);
- DropboxCredentialDetails credential =
credentialService.getDropboxCredential();
+ final DropboxCredentialDetails credential =
credentialService.getDropboxCredential();
return new DbxClientV2(config, new
DbxCredential(credential.getAccessToken(), -1L,
credential.getRefreshToken(), credential.getAppKey(),
credential.getAppSecret()));
}
+
+ default String convertFolderName(String folderName) {
+ return "/".equals(folderName) ? "" : folderName;
+ }
+
+ default String getParentPath(String fullPath) {
+ final int idx = fullPath.lastIndexOf("/");
+ final String parentPath = fullPath.substring(0, idx);
+ return "".equals(parentPath) ? "/" : parentPath;
+ }
+
+ default Map<String, String> createAttributeMap(FileMetadata fileMetadata) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(DropboxAttributes.ID, fileMetadata.getId());
+ attributes.put(DropboxAttributes.PATH,
getParentPath(fileMetadata.getPathDisplay()));
+ attributes.put(DropboxAttributes.FILENAME, fileMetadata.getName());
+ attributes.put(DropboxAttributes.SIZE,
valueOf(fileMetadata.getSize()));
+ attributes.put(DropboxAttributes.REVISION, fileMetadata.getRev());
+ attributes.put(DropboxAttributes.TIMESTAMP,
valueOf(fileMetadata.getServerModified().getTime()));
+ return attributes;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
index 3f8c235b0e..0499f18947 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java
@@ -16,16 +16,33 @@
*/
package org.apache.nifi.processors.dropbox;
-import static java.lang.String.format;
-
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
+
+import com.dropbox.core.DbxDownloader;
import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.FileMetadata;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -34,6 +51,7 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -49,15 +67,18 @@ import org.apache.nifi.proxy.ProxySpec;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"dropbox", "storage", "fetch"})
@CapabilityDescription("Fetches files from Dropbox. Designed to be used in
tandem with ListDropbox.")
-@WritesAttribute(attribute = "error.message", description = "When a FlowFile
is routed to 'failure', this attribute is added indicating why the file could "
- + "not be fetched from Dropbox.")
-@SeeAlso(ListDropbox.class)
-@WritesAttributes(
- @WritesAttribute(attribute = FetchDropbox.ERROR_MESSAGE_ATTRIBUTE,
description = "The error message returned by Dropbox when the fetch of a file
fails."))
+@SeeAlso({PutDropbox.class, ListDropbox.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC),
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = PATH, description = PATH_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = REVISION, description = REVISION_DESC)}
+)
public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
- public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
-
public static final PropertyDescriptor FILE = new PropertyDescriptor
.Builder().name("file")
.displayName("File")
@@ -83,7 +104,7 @@ public class FetchDropbox extends AbstractProcessor
implements DropboxTrait {
.description("A FlowFile will be routed here for each File
for which fetch was attempted but failed.")
.build();
- public static final Set<Relationship> relationships =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@@ -96,9 +117,11 @@ public class FetchDropbox extends AbstractProcessor
implements DropboxTrait {
private DbxClientV2 dropboxApiClient;
+ private DbxDownloader<FileMetadata> dbxDownloader;
+
@Override
public Set<Relationship> getRelationships() {
- return relationships;
+ return RELATIONSHIPS;
}
@Override
@@ -108,14 +131,19 @@ public class FetchDropbox extends AbstractProcessor
implements DropboxTrait {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
- String dropboxClientId = format("%s-%s", getClass().getSimpleName(),
getIdentifier());
- dropboxApiClient = getDropboxApiClient(context, proxyConfiguration,
dropboxClientId);
+ dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+ }
+
+ @OnUnscheduled
+ public void shutdown() {
+ if (dbxDownloader != null) {
+ dbxDownloader.close();
+ }
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- FlowFile flowFile = session.get();
+ final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@@ -124,24 +152,35 @@ public class FetchDropbox extends AbstractProcessor
implements DropboxTrait {
fileIdentifier = correctFilePath(fileIdentifier);
FlowFile outFlowFile = flowFile;
+ final long startNanos = System.nanoTime();
try {
- fetchFile(fileIdentifier, session, outFlowFile);
+ FileMetadata fileMetadata = fetchFile(fileIdentifier, session,
outFlowFile);
+
+ final Map<String, String> attributes =
createAttributeMap(fileMetadata);
+ outFlowFile = session.putAllAttributes(outFlowFile, attributes);
+ String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay();
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(flowFile, url,
transferMillis);
+
session.transfer(outFlowFile, REL_SUCCESS);
} catch (Exception e) {
handleError(session, flowFile, fileIdentifier, e);
}
}
- private void fetchFile(String fileId, ProcessSession session, FlowFile
outFlowFile) throws DbxException {
- InputStream dropboxInputStream = dropboxApiClient.files()
- .download(fileId)
- .getInputStream();
- session.importFrom(dropboxInputStream, outFlowFile);
+ private FileMetadata fetchFile(String fileId, ProcessSession session,
FlowFile outFlowFile) throws DbxException {
+ try (DbxDownloader<FileMetadata> downloader =
dropboxApiClient.files().download(fileId)) {
+ dbxDownloader = downloader;
+ final InputStream dropboxInputStream = downloader.getInputStream();
+ session.importFrom(dropboxInputStream, outFlowFile);
+ return downloader.getResult();
+ }
}
private void handleError(ProcessSession session, FlowFile flowFile, String
fileId, Exception e) {
getLogger().error("Error while fetching and processing file with id
'{}'", fileId, e);
- FlowFile outFlowFile = session.putAttribute(flowFile,
ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
+ final FlowFile outFlowFile = session.putAttribute(flowFile,
ERROR_MESSAGE, e.getMessage());
+ session.penalize(outFlowFile);
session.transfer(outFlowFile, REL_FAILURE);
}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
index 9f99b3839a..a2c4fc4ffb 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java
@@ -18,6 +18,18 @@ package org.apache.nifi.processors.dropbox;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2;
@@ -69,17 +81,19 @@ import org.apache.nifi.serialization.record.RecordSchema;
" This Processor is designed to run on Primary Node only in a cluster.
If the primary node changes, the new Primary Node will pick up where the" +
" previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({@WritesAttribute(attribute = DropboxFileInfo.ID,
description = "The Dropbox identifier of the file"),
- @WritesAttribute(attribute = DropboxFileInfo.PATH, description = "The
folder path where the file is located"),
- @WritesAttribute(attribute = DropboxFileInfo.FILENAME, description =
"The name of the file"),
- @WritesAttribute(attribute = DropboxFileInfo.SIZE, description = "The
size of the file"),
- @WritesAttribute(attribute = DropboxFileInfo.TIMESTAMP, description =
"The server modified time, when the file was uploaded to Dropbox"),
- @WritesAttribute(attribute = DropboxFileInfo.REVISION, description =
"Revision of the file")})
+@WritesAttributes({
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = PATH, description = PATH_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = REVISION, description = REVISION_DESC)})
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores
necessary data to be able to keep track what files have been listed already. " +
"What exactly needs to be stored depends on the 'Listing Strategy'.")
-@SeeAlso(FetchDropbox.class)
+@SeeAlso({FetchDropbox.class, PutDropbox.class})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class ListDropbox extends AbstractListProcessor<DropboxFileInfo>
implements DropboxTrait {
+
public static final PropertyDescriptor FOLDER = new
PropertyDescriptor.Builder()
.name("folder")
.displayName("Folder")
@@ -148,9 +162,7 @@ public class ListDropbox extends
AbstractListProcessor<DropboxFileInfo> implemen
@OnScheduled
public void onScheduled(final ProcessContext context) {
- final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
- String dropboxClientId = format("%s-%s", getClass().getSimpleName(),
getIdentifier());
- dropboxApiClient = getDropboxApiClient(context, proxyConfiguration,
dropboxClientId);
+ dropboxApiClient = getDropboxApiClient(context, getIdentifier());
}
@Override
@@ -189,20 +201,20 @@ public class ListDropbox extends
AbstractListProcessor<DropboxFileInfo> implemen
try {
Predicate<FileMetadata> metadataFilter =
createMetadataFilter(minTimestamp, minAge);
- ListFolderBuilder listFolderBuilder =
dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName));
+ final ListFolderBuilder listFolderBuilder =
dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName));
ListFolderResult result = listFolderBuilder
.withRecursive(recursive)
.start();
- List<FileMetadata> metadataList = new
ArrayList<>(filterMetadata(result, metadataFilter));
+ final List<FileMetadata> metadataList = new
ArrayList<>(filterMetadata(result, metadataFilter));
while (result.getHasMore()) {
result =
dropboxApiClient.files().listFolderContinue(result.getCursor());
metadataList.addAll(filterMetadata(result, metadataFilter));
}
- for (FileMetadata metadata : metadataList) {
- DropboxFileInfo.Builder builder = new DropboxFileInfo.Builder()
+ for (final FileMetadata metadata : metadataList) {
+ final DropboxFileInfo.Builder builder = new
DropboxFileInfo.Builder()
.id(metadata.getId())
.path(getParentPath(metadata.getPathDisplay()))
.name(metadata.getName())
@@ -268,13 +280,5 @@ public class ListDropbox extends
AbstractListProcessor<DropboxFileInfo> implemen
.collect(toList());
}
- private String getParentPath(String fullPath) {
- int idx = fullPath.lastIndexOf("/");
- String parentPath = fullPath.substring(0, idx);
- return "".equals(parentPath) ? "/" : parentPath;
- }
- private String convertFolderName(String folderName) {
- return "/".equals(folderName) ? "" : folderName;
- }
}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
new file mode 100644
index 0000000000..124af41d37
--- /dev/null
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static java.lang.String.format;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
+import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.DbxUploader;
+import com.dropbox.core.RateLimitException;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.FileMetadata;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+/**
+ * This processor uploads objects to Dropbox.
+ */
+@SeeAlso({ListDropbox.class, FetchDropbox.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"dropbox", "storage", "put"})
+@CapabilityDescription("Puts content to a Dropbox folder.")
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the Dropbox object.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ERROR_MESSAGE, description =
ERROR_MESSAGE_DESC),
+ @WritesAttribute(attribute = ID, description = ID_DESC),
+ @WritesAttribute(attribute = PATH, description = PATH_DESC),
+ @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
+ @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
+ @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
+ @WritesAttribute(attribute = REVISION, description = REVISION_DESC)})
+public class PutDropbox extends AbstractProcessor implements DropboxTrait {
+
+ public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
+
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String OVERWRITE_RESOLUTION = "overwrite";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Files that have been successfully written to Dropbox
are transferred to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Files that could not be written to Dropbox for some
reason are transferred to this relationship.")
+ .build();
+
+ public static final PropertyDescriptor FOLDER = new
PropertyDescriptor.Builder()
+ .name("folder")
+ .displayName("Folder")
+ .description("The path of the Dropbox folder to upload files to. "
+ + "The folder will be created if it does not exist yet.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
+ .defaultValue("/")
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME = new
PropertyDescriptor.Builder()
+ .name("file-name")
+ .displayName("Filename")
+ .description("The full name of the file to upload.")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("${filename}")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new
PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the
same name already exists in the specified Dropbox folder.")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION,
OVERWRITE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-size")
+ .displayName("Chunked Upload Size")
+ .description("Defines the size of a chunk. Used when a FlowFile's
size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller
chunks. "
+ + "It is recommended to specify chunked upload size
smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+ + "Maximum allowed value is 150 MB.")
+ .defaultValue("8 MB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(1,
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new
PropertyDescriptor.Builder()
+ .name("chunked-upload-threshold")
+ .displayName("Chunked Upload Threshold")
+ .description("The maximum size of the content which is uploaded at
once. FlowFiles larger than this threshold are uploaded in chunks. "
+ + "Maximum allowed value is 150 MB.")
+ .defaultValue("150 MB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(1,
SINGLE_UPLOAD_LIMIT_IN_BYTES))
+ .required(false)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ CREDENTIAL_SERVICE,
+ FOLDER,
+ FILE_NAME,
+ CONFLICT_RESOLUTION,
+ CHUNKED_UPLOAD_THRESHOLD,
+ CHUNKED_UPLOAD_SIZE,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(false,
ProxySpec.HTTP_AUTH)
+ ));
+
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(rels);
+ }
+
+ private DbxClientV2 dropboxApiClient;
+
+ private DbxUploader<?, ?, ?> dbxUploader;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ dropboxApiClient = getDropboxApiClient(context, getIdentifier());
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String folder =
context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
+ final String filename =
context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final long chunkUploadThreshold =
context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
+ .asDataSize(DataUnit.B)
+ .longValue();
+
+ final String conflictResolution =
context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ final long size = flowFile.getSize();
+ final String uploadPath = convertFolderName(folder) + "/" + filename;
+ final long startNanos = System.nanoTime();
+ FileMetadata fileMetadata = null;
+
+ try {
+ try (final InputStream rawIn = session.read(flowFile)) {
+ if (size <= chunkUploadThreshold) {
+ try (UploadUploader uploader =
createUploadUploader(uploadPath, conflictResolution)) {
+ fileMetadata = uploader.uploadAndFinish(rawIn);
+ }
+ } else {
+ fileMetadata = uploadLargeFileInChunks(uploadPath, rawIn,
size, uploadChunkSize, conflictResolution);
+ }
+ } catch (UploadErrorException e) {
+ handleUploadError(conflictResolution, uploadPath, e);
+ } catch (RateLimitException e) {
+ context.yield();
+ throw new ProcessException("Dropbox API rate limit exceeded
while uploading file", e);
+ }
+
+ if (fileMetadata != null) {
+ final Map<String, String> attributes =
createAttributeMap(fileMetadata);
+ String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, url,
transferMillis);
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ getLogger().error("Exception occurred while uploading file '{}' to
Dropbox folder '{}'", filename, folder, e);
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE,
e.getMessage());
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ @OnUnscheduled
+ public void shutdown() {
+ if (dbxUploader != null) {
+ dbxUploader.close();
+ }
+ }
+
+ private void handleUploadError(final String conflictResolution, final
String uploadPath, final UploadErrorException e) throws UploadErrorException {
+ if (e.errorValue.isPath() &&
e.errorValue.getPathValue().getReason().isConflict()) {
+
+ if (IGNORE_RESOLUTION.equals(conflictResolution)) {
+ getLogger().info("File with the same name [{}] already exists.
Remote file is not modified due to {} being set to '{}'.",
+ uploadPath, CONFLICT_RESOLUTION.getDisplayName(),
conflictResolution);
+ return;
+ } else if (conflictResolution.equals(FAIL_RESOLUTION)) {
+ throw new ProcessException(format("File with the same name
[%s] already exists.", uploadPath), e);
+ }
+ }
+ throw new ProcessException(e);
+ }
+
+ private FileMetadata uploadLargeFileInChunks(String path, InputStream
rawIn, long size, long uploadChunkSize, String conflictResolution) throws
DbxException, IOException {
+ final String sessionId;
+ try (UploadSessionStartUploader uploader =
createUploadSessionStartUploader()) {
+ sessionId = uploader.uploadAndFinish(rawIn,
uploadChunkSize).getSessionId();
+ }
+
+ long uploadedBytes = uploadChunkSize;
+
+ UploadSessionCursor cursor = new UploadSessionCursor(sessionId,
uploadedBytes);
+
+ while (size - uploadedBytes > uploadChunkSize) {
+ try (UploadSessionAppendV2Uploader uploader =
createUploadSessionAppendV2Uploader(cursor)) {
+ uploader.uploadAndFinish(rawIn, uploadChunkSize);
+ uploadedBytes += uploadChunkSize;
+ cursor = new UploadSessionCursor(sessionId, uploadedBytes);
+ }
+ }
+
+ final long remainingBytes = size - uploadedBytes;
+
+ final CommitInfo commitInfo = CommitInfo.newBuilder(path)
+ .withMode(getWriteMode(conflictResolution))
+ .withStrictConflict(true)
+ .withClientModified(new Date(System.currentTimeMillis()))
+ .build();
+
+ try (UploadSessionFinishUploader uploader =
createUploadSessionFinishUploader(cursor, commitInfo)) {
+ return uploader.uploadAndFinish(rawIn, remainingBytes);
+ }
+ }
+
+ private WriteMode getWriteMode(String conflictResolution) {
+ if (OVERWRITE_RESOLUTION.equals(conflictResolution)) {
+ return WriteMode.OVERWRITE;
+ } else {
+ return WriteMode.ADD;
+ }
+ }
+
+ private UploadUploader createUploadUploader(String path, String
conflictResolution) throws DbxException {
+ final UploadUploader uploadUploader = dropboxApiClient
+ .files()
+ .uploadBuilder(path)
+ .withMode(getWriteMode(conflictResolution))
+ .withStrictConflict(true)
+ .start();
+ dbxUploader = uploadUploader;
+ return uploadUploader;
+ }
+
+ private UploadSessionStartUploader createUploadSessionStartUploader()
throws DbxException {
+ final UploadSessionStartUploader sessionStartUploader =
dropboxApiClient
+ .files()
+ .uploadSessionStart();
+ dbxUploader = sessionStartUploader;
+ return sessionStartUploader;
+ }
+
+ private UploadSessionAppendV2Uploader
createUploadSessionAppendV2Uploader(UploadSessionCursor cursor) throws
DbxException {
+ final UploadSessionAppendV2Uploader sessionAppendV2Uploader =
dropboxApiClient
+ .files()
+ .uploadSessionAppendV2(cursor);
+ dbxUploader = sessionAppendV2Uploader;
+ return sessionAppendV2Uploader;
+ }
+
+ private UploadSessionFinishUploader
createUploadSessionFinishUploader(UploadSessionCursor cursor, CommitInfo
commitInfo) throws DbxException {
+ final UploadSessionFinishUploader sessionFinishUploader =
dropboxApiClient
+ .files()
+ .uploadSessionFinish(cursor, commitInfo);
+ dbxUploader = sessionFinishUploader;
+ return sessionFinishUploader;
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 3496b3318e..ff2f61db82 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.processors.dropbox.ListDropbox
org.apache.nifi.processors.dropbox.FetchDropbox
+org.apache.nifi.processors.dropbox.PutDropbox
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java
new file mode 100644
index 0000000000..b4c8872827
--- /dev/null
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.dropbox;
+
+import static java.util.stream.Collectors.toSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.FileMetadata;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.mockito.Mock;
+
+public class AbstractDropboxTest {
+ public static final String TEST_FOLDER = "/testFolder";
+ public static final String FILENAME_1 = "file_name_1";
+ public static final String FILENAME_2 = "file_name_2";
+ public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ";
+ public static final String FILE_ID_2 = "id:bdCQUvbpIEAABBAAAAAGKK";
+ public static final long CREATED_TIME = 1659707000;
+ public static final long SIZE = 125;
+ public static final String REVISION = "5e4ddb1320676a5c29261";
+
+ protected TestRunner testRunner;
+
+ @Mock
+ protected DbxClientV2 mockDropboxClient;
+
+ @Mock
+ private DropboxCredentialService mockCredentialService;
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ mockStandardDropboxCredentialService();
+ }
+
+ protected void assertProvenanceEvent(ProvenanceEventType eventType) {
+ Set<ProvenanceEventType> expectedEventTypes =
Collections.singleton(eventType);
+ Set<ProvenanceEventType> actualEventTypes =
testRunner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+ }
+
+ protected void assertNoProvenanceEvent() {
+ assertTrue(testRunner.getProvenanceEvents().isEmpty());
+ }
+
+ protected void mockStandardDropboxCredentialService() throws
InitializationException {
+ String credentialServiceId = "dropbox_credentials";
+
when(mockCredentialService.getIdentifier()).thenReturn(credentialServiceId);
+ testRunner.addControllerService(credentialServiceId,
mockCredentialService);
+ testRunner.enableControllerService(mockCredentialService);
+ testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE,
credentialServiceId);
+ }
+
+ protected FileMetadata createFileMetadata() {
+ return FileMetadata.newBuilder(FILENAME_1, FILE_ID_1,
+ new Date(CREATED_TIME),
+ new Date(CREATED_TIME),
+ REVISION, SIZE)
+ .withPathDisplay(getPath(TEST_FOLDER, FILENAME_1))
+ .withIsDownloadable(true)
+ .build();
+ }
+
+ protected FileMetadata createFileMetadata(
+ String id, String filename,
+ String parent,
+ long createdTime,
+ boolean isDownloadable) {
+ return FileMetadata.newBuilder(filename, id,
+ new Date(createdTime),
+ new Date(createdTime),
+ REVISION, SIZE)
+ .withPathDisplay(getPath(parent, filename))
+ .withIsDownloadable(isDownloadable)
+ .build();
+ }
+
+ protected FileMetadata createFileMetadata(String id,
+ String filename,
+ String parent,
+ long createdTime) {
+ return createFileMetadata(id, filename, parent, createdTime, true);
+ }
+
+ protected void assertOutFlowFileAttributes(MockFlowFile flowFile) {
+ assertOutFlowFileAttributes(flowFile, TEST_FOLDER);
+ }
+
+ protected void assertOutFlowFileAttributes(MockFlowFile flowFile, String
folderName) {
+ flowFile.assertAttributeEquals(DropboxAttributes.ID, FILE_ID_1);
+ flowFile.assertAttributeEquals(DropboxAttributes.FILENAME, FILENAME_1);
+ flowFile.assertAttributeEquals(DropboxAttributes.PATH, folderName);
+ flowFile.assertAttributeEquals(DropboxAttributes.TIMESTAMP,
Long.toString(CREATED_TIME));
+ flowFile.assertAttributeEquals(DropboxAttributes.SIZE,
Long.toString(SIZE));
+ flowFile.assertAttributeEquals(DropboxAttributes.REVISION, REVISION);
+ }
+
+ protected String getPath(String folder, String filename) {
+ return "/".equals(folder) ? folder + filename : folder + "/" +
filename;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java
index bb4bb9537b..c7920aff61 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.nifi.processors.dropbox;
+import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
import static org.mockito.Mockito.when;
import com.dropbox.core.DbxDownloader;
@@ -28,12 +30,9 @@ import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,36 +41,19 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-public class FetchDropboxTest {
-
- public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ";
- public static final String FILE_ID_2 = "id:odTlUvbpIEBBBBBBBBBGGQ";
- public static final String FILENAME = "file_name";
- public static final String FOLDER = "/testFolder";
- public static final String SIZE = "125";
- public static final String CREATED_TIME = "1659707000";
- public static final String REVISION = "5e4ddb1320676a5c29261";
-
- private TestRunner testRunner;
-
- @Mock
- private DbxClientV2 mockDropboxClient;
-
- @Mock
- private DropboxCredentialService credentialService;
+public class FetchDropboxTest extends AbstractDropboxTest {
@Mock
private DbxUserFilesRequests mockDbxUserFilesRequest;
-
@Mock
private DbxDownloader<FileMetadata> mockDbxDownloader;
@BeforeEach
- void setUp() throws Exception {
+ public void setUp() throws Exception {
FetchDropbox testSubject = new FetchDropbox() {
@Override
- public DbxClientV2 getDropboxApiClient(ProcessContext context,
ProxyConfiguration proxyConfiguration, String clientId) {
+ public DbxClientV2 getDropboxApiClient(ProcessContext context,
String id) {
return mockDropboxClient;
}
};
@@ -79,19 +61,18 @@ public class FetchDropboxTest {
testRunner = TestRunners.newTestRunner(testSubject);
when(mockDropboxClient.files()).thenReturn(mockDbxUserFilesRequest);
-
- mockStandardDropboxCredentialService();
+ super.setUp();
}
@Test
void testFileIsDownloadedById() throws Exception {
-
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenReturn(mockDbxDownloader);
when(mockDbxDownloader.getInputStream()).thenReturn(new
ByteArrayInputStream("content".getBytes(UTF_8)));
+ when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata());
- MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1);
+ MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile);
testRunner.run();
@@ -99,17 +80,19 @@ public class FetchDropboxTest {
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertContentEquals("content");
- assertOutFlowFileAttributes(ff0, FILE_ID_1);
+ assertOutFlowFileAttributes(ff0);
+ assertProvenanceEvent(ProvenanceEventType.FETCH);
}
@Test
void testFileIsDownloadedByPath() throws Exception {
testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}");
- when(mockDbxUserFilesRequest.download(FOLDER + "/" +
FILENAME)).thenReturn(mockDbxDownloader);
+ when(mockDbxUserFilesRequest.download(getPath(TEST_FOLDER,
FILENAME_1))).thenReturn(mockDbxDownloader);
when(mockDbxDownloader.getInputStream()).thenReturn(new
ByteArrayInputStream("contentByPath".getBytes(UTF_8)));
+ when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata());
- MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1);
+ MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile);
testRunner.run();
@@ -117,53 +100,38 @@ public class FetchDropboxTest {
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertContentEquals("contentByPath");
- assertOutFlowFileAttributes(ff0, FILE_ID_1);
+ assertOutFlowFileAttributes(ff0);
+ assertProvenanceEvent(ProvenanceEventType.FETCH);
}
@Test
void testFetchFails() throws Exception {
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
- when(mockDbxUserFilesRequest.download(FILE_ID_2)).thenThrow(new
DbxException("Error in Dropbox"));
+ when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenThrow(new
DbxException("Error in Dropbox"));
- MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_2);
+ MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_FAILURE, 1);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE);
MockFlowFile ff0 = flowFiles.get(0);
- ff0.assertAttributeEquals("error.message", "Error in Dropbox");
- assertOutFlowFileAttributes(ff0, FILE_ID_2);
- }
-
- private void mockStandardDropboxCredentialService() throws
InitializationException {
- String credentialServiceId = "dropbox_credentials";
-
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
- testRunner.addControllerService(credentialServiceId,
credentialService);
- testRunner.enableControllerService(credentialService);
- testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE,
credentialServiceId);
+ ff0.assertAttributeEquals(ERROR_MESSAGE, "Error in Dropbox");
+ assertOutFlowFileAttributes(ff0);
+ assertNoProvenanceEvent();
}
- private MockFlowFile getMockFlowFile(String fileId) {
+ private MockFlowFile getMockFlowFile() {
MockFlowFile inputFlowFile = new MockFlowFile(0);
Map<String, String> attributes = new HashMap<>();
- attributes.put(DropboxFileInfo.ID, fileId);
- attributes.put(DropboxFileInfo.REVISION, REVISION);
- attributes.put(DropboxFileInfo.FILENAME, FILENAME);
- attributes.put(DropboxFileInfo.PATH, FOLDER);
- attributes.put(DropboxFileInfo.SIZE, SIZE);
- attributes.put(DropboxFileInfo.TIMESTAMP, CREATED_TIME);
+ attributes.put(DropboxAttributes.ID, FILE_ID_1);
+ attributes.put(DropboxAttributes.REVISION, REVISION);
+ attributes.put(DropboxAttributes.FILENAME, FILENAME_1);
+ attributes.put(DropboxAttributes.PATH, TEST_FOLDER);
+ attributes.put(DropboxAttributes.SIZE, valueOf(SIZE));
+ attributes.put(DropboxAttributes.TIMESTAMP, valueOf(CREATED_TIME));
inputFlowFile.putAttributes(attributes);
return inputFlowFile;
}
-
- private void assertOutFlowFileAttributes(MockFlowFile flowFile, String
fileId) {
- flowFile.assertAttributeEquals(DropboxFileInfo.ID, fileId);
- flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
- flowFile.assertAttributeEquals(DropboxFileInfo.PATH, FOLDER);
- flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, SIZE);
- flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP,
CREATED_TIME);
- flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME);
- }
}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java
index bdabbf9600..be08c2dc94 100644
---
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.DbxUserFilesRequests;
-import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.FolderMetadata;
import com.dropbox.core.v2.files.ListFolderBuilder;
import com.dropbox.core.v2.files.ListFolderResult;
@@ -37,18 +36,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.List;
import java.util.Spliterator;
import java.util.stream.StreamSupport;
-import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -57,27 +52,16 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-public class ListDropboxTest {
-
- public static final String ID_1 = "id:11111";
- public static final String ID_2 = "id:22222";
- public static final String TEST_FOLDER = "/testFolder";
- public static final String FILENAME_1 = "file_name_1";
- public static final String FILENAME_2 = "file_name_2";
- public static final long SIZE = 125;
- public static final long CREATED_TIME = 1659707000;
- public static final String REVISION = "5e4ddb1320676a5c29261";
+public class ListDropboxTest extends AbstractDropboxTest {
+
+ public static final String FOLDER_ID = "id:11111";
public static final boolean IS_RECURSIVE = true;
public static final long MIN_TIMESTAMP = 1659707000;
public static final long OLD_CREATED_TIME = 1657375066;
- private TestRunner testRunner;
@Mock
private DbxClientV2 mockDropboxClient;
- @Mock
- private DropboxCredentialService credentialService;
-
@Mock
private DbxUserFilesRequests mockDbxUserFilesRequest;
@@ -88,10 +72,10 @@ public class ListDropboxTest {
private ListFolderBuilder mockListFolderBuilder;
@BeforeEach
- void setUp() throws Exception {
+ protected void setUp() throws Exception {
ListDropbox testSubject = new ListDropbox() {
@Override
- public DbxClientV2 getDropboxApiClient(ProcessContext context,
ProxyConfiguration proxyConfiguration, String clientId) {
+ public DbxClientV2 getDropboxApiClient(ProcessContext context,
String id) {
return mockDropboxClient;
}
@@ -104,10 +88,9 @@ public class ListDropboxTest {
testRunner = TestRunners.newTestRunner(testSubject);
- mockStandardDropboxCredentialService();
-
testRunner.setProperty(ListDropbox.RECURSIVE_SEARCH,
Boolean.toString(IS_RECURSIVE));
testRunner.setProperty(ListDropbox.MIN_AGE, "0 sec");
+ super.setUp();
}
@Test
@@ -140,7 +123,7 @@ public class ListDropboxTest {
//root is listed when "" is used in Dropbox API
when(mockDbxUserFilesRequest.listFolderBuilder("")).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(singletonList(
- createFileMetadata(FILENAME_1, folderName, ID_1, CREATED_TIME)
+ createFileMetadata(FILE_ID_1, FILENAME_1, folderName,
CREATED_TIME)
));
testRunner.run();
@@ -148,7 +131,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
- assertFlowFileAttributes(ff0, folderName);
+ assertOutFlowFileAttributes(ff0, folderName);
}
@Test
@@ -159,9 +142,9 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
- createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1,
CREATED_TIME),
- createFolderMetadata("testFolder1", TEST_FOLDER),
- createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2,
CREATED_TIME, false)
+ createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER,
CREATED_TIME),
+ createFolderMetadata(),
+ createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER,
CREATED_TIME, false)
));
testRunner.run();
@@ -169,7 +152,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
- assertFlowFileAttributes(ff0, TEST_FOLDER);
+ assertOutFlowFileAttributes(ff0);
}
@Test
@@ -180,8 +163,8 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
- createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1,
CREATED_TIME),
- createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2,
OLD_CREATED_TIME)
+ createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER,
CREATED_TIME),
+ createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER,
OLD_CREATED_TIME)
));
testRunner.run();
@@ -189,7 +172,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
- assertFlowFileAttributes(ff0, TEST_FOLDER);
+ assertOutFlowFileAttributes(ff0);
}
@Test
@@ -201,8 +184,8 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
- createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1,
CREATED_TIME),
- createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME)
+ createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER,
CREATED_TIME),
+ createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER,
CREATED_TIME)
));
testRunner.run();
@@ -216,52 +199,12 @@ public class ListDropboxTest {
assertEquals(expectedFileNames, actualFileNames);
}
- private void assertFlowFileAttributes(MockFlowFile flowFile, String
folderName) {
- flowFile.assertAttributeEquals(DropboxFileInfo.ID, ID_1);
- flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME_1);
- flowFile.assertAttributeEquals(DropboxFileInfo.PATH, folderName);
- flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP,
Long.toString(CREATED_TIME));
- flowFile.assertAttributeEquals(DropboxFileInfo.SIZE,
Long.toString(SIZE));
- flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
- }
-
- private FileMetadata createFileMetadata(
- String filename,
- String parent,
- String id,
- long createdTime,
- boolean isDownloadable) {
- return FileMetadata.newBuilder(filename, id,
- new Date(createdTime),
- new Date(createdTime),
- REVISION, SIZE)
- .withPathDisplay(parent + "/" + filename)
- .withIsDownloadable(isDownloadable)
+ private Metadata createFolderMetadata() {
+ return FolderMetadata.newBuilder(FOLDER_ID)
+ .withPathDisplay(TEST_FOLDER + "/" + FOLDER_ID)
.build();
}
- private FileMetadata createFileMetadata(
- String filename,
- String parent,
- String id,
- long createdTime) {
- return createFileMetadata(filename, parent, id, createdTime, true);
- }
-
- private Metadata createFolderMetadata(String folderName, String parent) {
- return FolderMetadata.newBuilder(folderName)
- .withPathDisplay(parent + "/" + folderName)
- .build();
- }
-
- private void mockStandardDropboxCredentialService() throws Exception {
- String credentialServiceId = "dropbox_credentials";
-
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
- testRunner.addControllerService(credentialServiceId,
credentialService);
- testRunner.enableControllerService(credentialService);
- testRunner.setProperty(ListDropbox.CREDENTIAL_SERVICE,
credentialServiceId);
- }
-
private void mockRecordWriter() throws InitializationException {
RecordSetWriterFactory recordWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record_writer", recordWriter);
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
new file mode 100644
index 0000000000..5b2645b290
--- /dev/null
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
+
+ private static final String CONTENT = "content";
+ private static final String CHANGED_CONTENT = "changedContent";
+ private static final String NON_EXISTING_FOLDER = "/doesNotExistYet";
+
+ @BeforeEach
+ public void init() throws Exception {
+ super.init();
+ testRunner.setProperty(PutDropbox.FILE_NAME, "testFile.json");
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ deleteFolderIfExists(NON_EXISTING_FOLDER);
+ }
+
+ @Override
+ protected PutDropbox createTestSubject() {
+ return new PutDropbox();
+ }
+
+ @Test
+ void testUploadFileToExistingDirectory() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ }
+
+ @Test
+ void testUploadFileCreateFolderWithSubFolders() {
+ testRunner.setProperty(PutDropbox.FOLDER, NON_EXISTING_FOLDER +
"/subFolder1/subFolder2");
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ }
+
+ @Test
+ void testEmptyFileIsUpladed() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ }
+
+ @Test
+ void testUploadExistingFileFailStrategy() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.FAIL_RESOLUTION);
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.clearTransferState();
+
+ testRunner.enqueue(CHANGED_CONTENT);
+ testRunner.run();
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1);
+ }
+
+ @Test
+ void testUploadExistingFileWithSameContentFailStrategy() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.FAIL_RESOLUTION);
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+
+ testRunner.clearTransferState();
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1);
+ }
+
+ @Test
+ void testUploadExistingFileOverwriteStrategy() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.OVERWRITE_RESOLUTION);
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ testRunner.enqueue(CHANGED_CONTENT);
+ testRunner.run();
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ }
+
+ @Test
+ void testUploadExistingFileIgnoreStrategy() {
+ testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.IGNORE_RESOLUTION);
+
+ testRunner.enqueue(CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ testRunner.enqueue(CHANGED_CONTENT);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
+ testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
new file mode 100644
index 0000000000..05e4c2b05f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.dropbox;
+
+import static com.dropbox.core.v2.files.UploadError.path;
+import static com.dropbox.core.v2.files.WriteConflictError.FILE;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.dropbox.core.DbxException;
+import com.dropbox.core.LocalizedText;
+import com.dropbox.core.v2.DbxClientV2;
+import com.dropbox.core.v2.files.CommitInfo;
+import com.dropbox.core.v2.files.DbxUserFilesRequests;
+import com.dropbox.core.v2.files.UploadErrorException;
+import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
+import com.dropbox.core.v2.files.UploadSessionCursor;
+import com.dropbox.core.v2.files.UploadSessionFinishUploader;
+import com.dropbox.core.v2.files.UploadSessionStartResult;
+import com.dropbox.core.v2.files.UploadSessionStartUploader;
+import com.dropbox.core.v2.files.UploadUploader;
+import com.dropbox.core.v2.files.UploadWriteFailed;
+import com.dropbox.core.v2.files.WriteError;
+import com.dropbox.core.v2.files.WriteMode;
+import java.io.InputStream;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class PutDropboxTest extends AbstractDropboxTest {
+
+ public static final long CHUNKED_UPLOAD_SIZE_IN_BYTES = 8;
+ public static final long CHUNKED_UPLOAD_THRESHOLD_IN_BYTES = 15;
+ public static final String CONTENT = "1234567890";
+ public static final String LARGE_CONTENT_30B =
"123456789012345678901234567890";
+ public static final String SESSION_ID = "sessionId";
+
+
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ private DbxUserFilesRequests mockDbxUserFilesRequest;
+
+ @Mock
+ private UploadUploader mockUploadUploader;
+
+ @Mock
+ private UploadSessionStartUploader mockUploadSessionStartUploader;
+
+ @Mock
+ private UploadSessionStartResult mockUploadSessionStartResult;
+
+ @Mock
+ private UploadSessionAppendV2Uploader mockUploadSessionAppendV2Uploader;
+
+ @Mock
+ private UploadSessionFinishUploader mockUploadSessionFinishUploader;
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ final PutDropbox testSubject = new PutDropbox() {
+ @Override
+ public DbxClientV2 getDropboxApiClient(ProcessContext context,
String id) {
+ return mockDropboxClient;
+ }
+ };
+ testRunner = TestRunners.newTestRunner(testSubject);
+ testRunner.setProperty(PutDropbox.FOLDER, TEST_FOLDER);
+ super.setUp();
+ }
+
+ @Test
+ void testFolderValidity() {
+ testRunner.setProperty(PutDropbox.FOLDER, "/");
+ testRunner.assertValid();
+ testRunner.setProperty(PutDropbox.FOLDER, "/tempFolder");
+ testRunner.assertValid();
+ }
+
+ @Test
+ void testUploadChunkSizeValidity() {
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "40 MB");
+ testRunner.assertValid();
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "152 MB");
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "1024");
+ testRunner.assertNotValid();
+ }
+
+ @Test
+ void testFileUploadFileNameFromProperty() throws Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+ mockFileUpload(TEST_FOLDER, FILENAME_1);
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS);
+ MockFlowFile ff0 = flowFiles.get(0);
+ assertOutFlowFileAttributes(ff0);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadFileNameFromFlowFileAttribute() throws Exception {
+ mockFileUpload(TEST_FOLDER, FILENAME_2);
+
+ final MockFlowFile mockFlowFile = getMockFlowFile(CONTENT);
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", FILENAME_2);
+ mockFlowFile.putAttributes(attributes);
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadFileToRoot() throws Exception {
+ testRunner.setProperty(PutDropbox.FOLDER, "/");
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+
+ mockFileUpload("/", FILENAME_1);
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+
+ List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS);
+ MockFlowFile ff0 = flowFiles.get(0);
+ assertOutFlowFileAttributes(ff0, "/");
+ }
+
+ @Test
+ void testFileUploadWithOverwriteConflictResolutionStrategy() throws
Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.OVERWRITE_RESOLUTION);
+
+ mockFileUpload(TEST_FOLDER, FILENAME_1, WriteMode.OVERWRITE);
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testFileUploadError() throws Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+
+ mockFileUploadError(new DbxException("Dropbox error"));
+
+ runWithFlowFile();
+
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
+ List<MockFlowFile> flowFiles =
testRunner.getFlowFilesForRelationship(PutDropbox.REL_FAILURE);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals(ERROR_MESSAGE, "Dropbox error");
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileUploadOtherExceptionIsNotIgnored() throws Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.IGNORE_RESOLUTION);
+
+ mockFileUploadError(getException(WriteError.INSUFFICIENT_SPACE));
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileUploadConflictIgnoredWithIgnoreResolutionStrategy() throws
Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+ testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION,
PutDropbox.IGNORE_RESOLUTION);
+
+ mockFileUploadError(getException(WriteError.conflict(FILE)));
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileUploadConflictNotIgnoredWithDefaultFailStrategy() throws
Exception {
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+
+ mockFileUploadError(getException(WriteError.conflict(FILE)));
+
+ runWithFlowFile();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
+ assertNoProvenanceEvent();
+ }
+
+ @Test
+ void testFileUploadLargeFile() throws Exception {
+ MockFlowFile mockFlowFile = getMockFlowFile(LARGE_CONTENT_30B);
+
+ testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE,
CHUNKED_UPLOAD_SIZE_IN_BYTES + " B");
+ testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_THRESHOLD,
CHUNKED_UPLOAD_THRESHOLD_IN_BYTES + " B");
+
+ when(mockDropboxClient.files())
+ .thenReturn(mockDbxUserFilesRequest);
+
+ //start session: 8 bytes uploaded
+ when(mockDbxUserFilesRequest
+ .uploadSessionStart())
+ .thenReturn(mockUploadSessionStartUploader);
+
+ when(mockUploadSessionStartUploader
+ .uploadAndFinish(any(InputStream.class),
eq(CHUNKED_UPLOAD_SIZE_IN_BYTES)))
+ .thenReturn(mockUploadSessionStartResult);
+
+ when(mockUploadSessionStartResult
+ .getSessionId())
+ .thenReturn(SESSION_ID);
+
+ //append session: invoked twice, 2 * 8 bytes uploaded
+ when(mockDbxUserFilesRequest
+ .uploadSessionAppendV2(any(UploadSessionCursor.class)))
+ .thenReturn(mockUploadSessionAppendV2Uploader);
+
+ //finish session: 30 - 8 - 2 * 8 = 6 bytes uploaded
+ CommitInfo commitInfo = CommitInfo.newBuilder(getPath(TEST_FOLDER ,
FILENAME_1))
+ .withMode(WriteMode.ADD)
+ .withStrictConflict(true)
+ .withClientModified(new Date(mockFlowFile.getEntryDate()))
+ .build();
+
+ when(mockDbxUserFilesRequest
+ .uploadSessionFinish(any(UploadSessionCursor.class),
eq(commitInfo)))
+ .thenReturn(mockUploadSessionFinishUploader);
+
+ when(mockUploadSessionFinishUploader
+ .uploadAndFinish(any(InputStream.class), eq(6L)))
+ .thenReturn(createFileMetadata(FILE_ID_1, FILENAME_1,
TEST_FOLDER, CREATED_TIME));
+
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
+
+ verify(mockUploadSessionAppendV2Uploader, times(2))
+ .uploadAndFinish(any(InputStream.class),
eq(CHUNKED_UPLOAD_SIZE_IN_BYTES));
+ assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ private void mockFileUpload(String folder, String filename) throws
Exception {
+ mockFileUpload(folder, filename, WriteMode.ADD);
+ }
+
+ private void mockFileUpload(String folder, String filename, WriteMode
writeMode) throws Exception {
+ when(mockDropboxClient.files())
+ .thenReturn(mockDbxUserFilesRequest);
+
+ when(mockDbxUserFilesRequest
+ .uploadBuilder(getPath(folder, filename))
+ .withMode(writeMode)
+ .withStrictConflict(true)
+ .start())
+ .thenReturn(mockUploadUploader);
+
+ when(mockUploadUploader
+ .uploadAndFinish(any(InputStream.class)))
+ .thenReturn(createFileMetadata(FILE_ID_1, filename, folder,
CREATED_TIME));
+ }
+
+ private void mockFileUploadError(DbxException exception) throws Exception {
+ when(mockDropboxClient.files())
+ .thenReturn(mockDbxUserFilesRequest);
+
+ when(mockDbxUserFilesRequest
+ .uploadBuilder(getPath(TEST_FOLDER, FILENAME_1))
+ .withMode(WriteMode.ADD)
+ .withStrictConflict(true)
+ .start())
+ .thenReturn(mockUploadUploader);
+
+ when(mockUploadUploader
+ .uploadAndFinish(any(InputStream.class)))
+ .thenThrow(exception);
+ }
+
+ private UploadErrorException getException(WriteError writeErrorReason) {
+ return new UploadErrorException("route", "requestId", new
LocalizedText("upload error", "en-us"),
+ path(new UploadWriteFailed(writeErrorReason,
"uploadSessionId")));
+ }
+
+ private MockFlowFile getMockFlowFile(String content) {
+ MockFlowFile inputFlowFile = new MockFlowFile(0);
+ inputFlowFile.setData(content.getBytes(UTF_8));
+ return inputFlowFile;
+ }
+
+ private void runWithFlowFile() {
+ MockFlowFile mockFlowFile = getMockFlowFile(CONTENT);
+ testRunner.enqueue(mockFlowFile);
+ testRunner.run();
+ }
+}
\ No newline at end of file