This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b86466b  NIFI-9669 Adding PutDynamoDBRecord processor
b86466b is described below

commit b86466b4a5eae655a0a708430da722a164cef8aa
Author: Bence Simon <[email protected]>
AuthorDate: Fri Feb 11 16:57:57 2022 +0100

    NIFI-9669 Adding PutDynamoDBRecord processor
    
    This closes #5761.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../AbstractAWSCredentialsProviderProcessor.java   |   1 +
 .../nifi/processors/aws/AbstractAWSProcessor.java  |  27 +-
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml    |   9 +
 .../processors/aws/dynamodb/DeleteDynamoDB.java    |  24 +-
 .../nifi/processors/aws/dynamodb/GetDynamoDB.java  |  24 +-
 .../nifi/processors/aws/dynamodb/PutDynamoDB.java  |  24 +-
 .../processors/aws/dynamodb/PutDynamoDBRecord.java | 378 +++++++++++++++++++++
 .../aws/dynamodb/RecordToItemConverter.java        | 119 +++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../additionalDetails.html                         | 260 ++++++++++++++
 .../aws/dynamodb/PutDynamoDBRecordTest.java        | 325 ++++++++++++++++++
 .../aws/dynamodb/RecordToItemConverterTest.java    | 265 +++++++++++++++
 .../test/resources/dynamodb/multipleChunks.json    |  31 ++
 .../test/resources/dynamodb/multipleInputs.json    |   5 +
 .../test/resources/dynamodb/nonRecordOriented.txt  |   1 +
 .../src/test/resources/dynamodb/singleInput.json   |   1 +
 .../nifi/serialization/SplitRecordSetHandler.java  | 102 ++++++
 .../SplitRecordSetHandlerException.java            |  28 ++
 .../serialization/SplitRecordSetHandlerTest.java   | 251 ++++++++++++++
 19 files changed, 1832 insertions(+), 44 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
index 8b2c560..91cf927 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -50,6 +50,7 @@ public abstract class 
AbstractAWSCredentialsProviderProcessor<ClientType extends
      */
     public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = 
new PropertyDescriptor.Builder()
             .name("AWS Credentials Provider service")
+            .displayName("AWS Credentials Provider Service")
             .description("The Controller Service that is used to obtain aws 
credentials provider")
             .required(false)
             .identifiesControllerService(AWSCredentialsProviderService.class)
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 9a9e0f2..82a67a3 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -46,6 +46,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
 import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.proxy.ProxySpec;
 import org.apache.nifi.ssl.SSLContextService;
 
@@ -184,39 +185,46 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
 
     @Override
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+        final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(validationContext));
 
         final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
         final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
         if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
-            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
+            validationResults.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
         }
 
         final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTIALS_FILE).isSet();
         if ((secretKeySet || accessKeySet) && credentialsFileSet) {
-            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
+            validationResults.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
         }
 
         final boolean proxyHostSet = 
validationContext.getProperty(PROXY_HOST).isSet();
         final boolean proxyPortSet = 
validationContext.getProperty(PROXY_HOST_PORT).isSet();
+        final boolean proxyConfigServiceSet = 
validationContext.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet();
 
         if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && 
proxyPortSet)) {
-            problems.add(new ValidationResult.Builder().subject("Proxy Host 
and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both 
must be set").build());
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy Host and 
Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must 
be set").build());
         }
 
         final boolean proxyUserSet = 
validationContext.getProperty(PROXY_USERNAME).isSet();
         final boolean proxyPwdSet = 
validationContext.getProperty(PROXY_PASSWORD).isSet();
 
         if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
-            problems.add(new ValidationResult.Builder().subject("Proxy User 
and Password").valid(false).explanation("If Proxy Username or Proxy Password is 
set, both must be set").build());
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy User and 
Password").valid(false).explanation("If Proxy Username or Proxy Password is 
set, both must be set").build());
         }
+
         if (proxyUserSet && !proxyHostSet) {
-            problems.add(new 
ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy 
username is set, proxy host must be set").build());
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy 
Username or Proxy Password").build());
         }
 
-        ProxyConfiguration.validateProxySpec(validationContext, problems, 
PROXY_SPECS);
+        ProxyConfiguration.validateProxySpec(validationContext, 
validationResults, PROXY_SPECS);
+
+        if (proxyHostSet && proxyConfigServiceSet) {
+            validationResults.add(new 
ValidationResult.Builder().subject("Proxy Configuration Service").valid(false)
+                    .explanation("Either Proxy Username and Proxy Password 
must be set or Proxy Configuration Service but not both").build());
+        }
 
-        return problems;
+        return validationResults;
     }
 
     protected ClientConfiguration createConfiguration(final ProcessContext 
context) {
@@ -257,6 +265,9 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
                 componentProxyConfig.setProxyUserName(proxyUsername);
                 componentProxyConfig.setProxyUserPassword(proxyPassword);
                 return componentProxyConfig;
+            } else if 
(context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).isSet())
 {
+                final ProxyConfigurationService configurationService = 
context.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+                return configurationService.getConfiguration();
             }
             return ProxyConfiguration.DIRECT_CONFIGURATION;
         });
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 713e0df..7af8db6 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -59,6 +59,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
@@ -123,6 +128,10 @@
                     <excludes combine.children="append">
                         <exclude>src/test/resources/hello.txt</exclude>
                         
<exclude>src/test/resources/mock-aws-credentials.properties</exclude>
+                        
<exclude>src/test/resources/dynamodb/multipleChunks.json</exclude>
+                        
<exclude>src/test/resources/dynamodb/multipleInputs.json</exclude>
+                        
<exclude>src/test/resources/dynamodb/nonRecordOriented.txt</exclude>
+                        
<exclude>src/test/resources/dynamodb/singleInput.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
index 34f712d..f769439 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -47,23 +47,23 @@ import 
com.amazonaws.services.dynamodbv2.model.AttributeValue;
 import com.amazonaws.services.dynamodbv2.model.WriteRequest;
 
 @SupportsBatching
-@SeeAlso({GetDynamoDB.class, PutDynamoDB.class})
+@SeeAlso({GetDynamoDB.class, PutDynamoDB.class, PutDynamoDBRecord.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"})
 @CapabilityDescription("Deletes a document from DynamoDB based on hash and 
range key. The key can be string or number."
         + " The request requires all the primary keys for the operation (hash 
or hash and range key)")
 @WritesAttributes({
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
status code")
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = 
"DynamoDB unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"DynamoDB range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB 
key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"DynamoDB exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error 
message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error 
service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB 
status code")
     })
 @ReadsAttributes({
     @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items 
hash key value" ),
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
index 14626ca..37070fd 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -56,24 +56,24 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 @SupportsBatching
-@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
+@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class, PutDynamoDBRecord.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "DynamoDB", "AWS", "Get", "Fetch"})
 @CapabilityDescription("Retrieves a document from DynamoDB based on hash and 
range key.  The key can be string or number."
         + "For any get request all the primary keys are required (hash or hash 
and range based on the table keys)."
         + "A Json Document ('Map') attribute of the DynamoDB item is read into 
the content of the FlowFile.")
 @WritesAttributes({
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
status code")
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = 
"DynamoDB unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"DynamoDB range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB 
key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"DynamoDB exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error 
message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error 
service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB 
status code")
     })
 @ReadsAttributes({
     @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items 
hash key value" ),
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
index 23b8bab..7932de4 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -53,7 +53,7 @@ import com.amazonaws.services.dynamodbv2.model.AttributeValue;
 import com.amazonaws.services.dynamodbv2.model.WriteRequest;
 
 @SupportsBatching
-@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class})
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDBRecord.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"})
 @CapabilityDescription("Puts a document from DynamoDB based on hash and range 
key.  The table can have either hash and range or hash key alone."
@@ -61,17 +61,17 @@ import com.amazonaws.services.dynamodbv2.model.WriteRequest;
     + "In case of hash and range keys both key are required for the operation."
     + " The FlowFile content must be JSON. FlowFile content is mapped to the 
specified Json Document attribute in the DynamoDB item.")
 @WritesAttributes({
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
-    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = 
"DynamoDB unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"DynamoDB range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB 
key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"DynamoDB exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error 
message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error 
service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB 
error status code"),
     @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
 })
 @ReadsAttributes({
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
new file mode 100644
index 0000000..05a6109
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of 
chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = 
"DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"DynamoDB range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB 
key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"DynamoDB exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error 
message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error 
service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = 
PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of 
chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition 
Key Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Key 
Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The 
incoming Records must not contain field with the same name defined by the 
\"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", 
"Sort By Field",
+            "Uses the value of the Record field identified by the \"Sort Key 
Field\" property as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. This will be used as sort 
key value.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the 
DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field 
value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used 
as the value of the partition key when using \"Partition by attribute\" 
partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the 
DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = 
flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? 
Integer.parseInt(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 
0;
+        final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new 
DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, 
flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), 
alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + 
e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, 
String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, 
attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else {
+            handleError(context, session, result, outgoingFlowFile);
+        }
+    }
+
+    private void handleError(
+            final ProcessContext context,
+            final ProcessSession session,
+            final SplitRecordSetHandler.RecordHandlerResult result,
+            final FlowFile outgoingFlowFile
+    ) {
+        final Throwable error = result.getThrowable();
+        final Throwable cause = error.getCause();
+        final String message = error.getMessage();
+
+        if (cause instanceof ProvisionedThroughputExceededException) {
+            // When DynamoDB returns with {@code 
ProvisionedThroughputExceededException}, the client reached it's write 
limitation and
+            // should be retried at a later time. We yield the processor and 
the FlowFile is considered unprocessed (partially processed) due to temporary 
write limitations.
+            // More about throughput limitations: 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (cause instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server 
exception: " + message, error);
+            session.transfer(processServiceException(session, 
Collections.singletonList(outgoingFlowFile), (AmazonServiceException) cause), 
REL_FAILURE);
+        } else if (cause instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client 
exception: " + message, error);
+            session.transfer(processClientException(session, 
Collections.singletonList(outgoingFlowFile), (AmazonClientException) cause), 
REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + message, error);
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends 
SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = 
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) 
throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = 
dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        throw new SplitRecordSetHandlerException("Could not 
insert all items. The unprocessed items are: " + 
outcome.getUnprocessedItems().toString());
+                    }
+                } else {
+                    logger.debug("Skipping chunk as was already processed");
+                }
+
+                accumulator = new TableWriteItems(tableName);
+            } catch (final Exception e) {
+                throw new SplitRecordSetHandlerException(e);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            itemCounter++;
+            accumulator.addItemToPut(convert(record));
+        }
+
+        private Item convert(final Record record) {
+            final String partitionKeyField  = 
context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String sortKeyStrategy  = 
context.getProperty(SORT_KEY_STRATEGY).getValue();
+            final String sortKeyField  = 
context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
+
+            final Item result = new Item();
+
+            record.getSchema()
+                    .getFields()
+                    .stream()
+                    .filter(field -> 
!field.getFieldName().equals(partitionKeyField))
+                    .filter(field -> 
SORT_NONE.getValue().equals(sortKeyStrategy) || 
!field.getFieldName().equals(sortKeyField))
+                    .forEach(field -> RecordToItemConverter.addField(record, 
result, field.getDataType().getFieldType(), field.getFieldName()));
+
+            addPartitionKey(record, result);
+            addSortKey(record, result);
+            return result;
+        }
+
+        private void addPartitionKey(final Record record, final Item result) {
+            final String partitionKeyStrategy = 
context.getProperty(PARTITION_KEY_STRATEGY).getValue();
+            final String partitionKeyField  = 
context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String partitionKeyAttribute = 
context.getProperty(PARTITION_KEY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+
+            if (PARTITION_BY_FIELD.getValue().equals(partitionKeyStrategy)) {
+                if 
(!record.getSchema().getFieldNames().contains(partitionKeyField)) {
+                    throw new ProcessException("\"" + 
PARTITION_BY_FIELD.getDisplayName() + "\" strategy needs the \"" + 
PARTITION_KEY_FIELD.getDefaultValue() +"\" to present in the record");
+                }
+
+                result.withKeyComponent(partitionKeyField, 
record.getValue(partitionKeyField));
+            } else if 
(PARTITION_BY_ATTRIBUTE.getValue().equals(partitionKeyStrategy)) {
+                if 
(record.getSchema().getFieldNames().contains(partitionKeyField)) {
+                    throw new ProcessException("Cannot reuse existing field 
with " + PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + 
PARTITION_BY_ATTRIBUTE.getDisplayName() + "\"");
+                }
+
+                if (!flowFileAttributes.containsKey(partitionKeyAttribute)) {
+                    throw new ProcessException("Missing attribute \"" + 
partitionKeyAttribute + "\"" );
+                }
+
+                result.withKeyComponent(partitionKeyField, 
flowFileAttributes.get(partitionKeyAttribute));
+            } else if 
(PARTITION_GENERATED.getValue().equals(partitionKeyStrategy)) {
+                if 
(record.getSchema().getFieldNames().contains(partitionKeyField)) {
+                    throw new ProcessException("Cannot reuse existing field 
with " + PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + 
PARTITION_GENERATED.getDisplayName() + "\"");
+                }
+
+                result.withKeyComponent(partitionKeyField, 
UUID.randomUUID().toString());
+            } else {
+                throw new ProcessException("Unknown " + 
PARTITION_KEY_STRATEGY.getDisplayName() + " \"" + partitionKeyStrategy + "\"");
+            }
+        }
+
+        private void addSortKey(final Record record, final Item result) {
+            final String sortKeyStrategy  = 
context.getProperty(SORT_KEY_STRATEGY).getValue();
+            final String sortKeyField  = 
context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
+
+            if (SORT_BY_FIELD.getValue().equals(sortKeyStrategy)) {
+                if 
(!record.getSchema().getFieldNames().contains(sortKeyField)) {
+                    throw new ProcessException(SORT_BY_FIELD.getDisplayName() 
+ " strategy needs the \"" + SORT_KEY_FIELD.getDisplayName() + "\" to present 
in the record");
+                }
+
+                result.withKeyComponent(sortKeyField, 
record.getValue(sortKeyField));
+            } else if (SORT_BY_SEQUENCE.getValue().equals(sortKeyStrategy)) {
+                if (record.getSchema().getFieldNames().contains(sortKeyField)) 
{
+                    throw new ProcessException("Cannot reuse existing field 
with " + SORT_KEY_STRATEGY.getDisplayName() + "  \"" + 
SORT_BY_SEQUENCE.getDisplayName() +"\"");
+                }
+
+                result.withKeyComponent(sortKeyField, itemCounter);
+            } else if (SORT_NONE.getValue().equals(sortKeyStrategy)) {
+                logger.debug("No " + SORT_KEY_STRATEGY.getDisplayName() + " 
was applied");
+            } else {
+                throw new ProcessException("Unknown " + 
SORT_KEY_STRATEGY.getDisplayName() + " \"" + sortKeyStrategy + "\"");
+            }
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverter.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverter.java
new file mode 100644
index 0000000..ab3de73
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+final class RecordToItemConverter {
+
+    private RecordToItemConverter() {
+        // Not intended to be instantiated
+    }
+
+    /*
+     * 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.ArbitraryDataMapping.html
+     */
+    public static void addField(final Record record, final Item item, final 
RecordFieldType fieldType, final String fieldName) {
+        switch (fieldType) {
+            case BOOLEAN:
+            case SHORT:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case BYTE:
+            case DOUBLE:
+            case STRING:
+                item.with(fieldName, record.getValue(fieldName));
+                break;
+            case BIGINT:
+                item.withBigInteger(fieldName, new 
BigInteger(record.getAsString(fieldName)));
+                break;
+            case DECIMAL:
+                item.withNumber(fieldName, new 
BigDecimal(record.getAsString(fieldName)));
+                break;
+            case TIMESTAMP:
+            case DATE:
+            case TIME:
+                // 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.String
+                item.withString(fieldName, record.getAsString(fieldName));
+            case CHAR:
+                item.withString(fieldName, record.getAsString(fieldName));
+                break;
+            case ENUM:
+                item.withString(fieldName, record.getAsString(fieldName));
+                break;
+            case ARRAY:
+                item.withList(fieldName, record.getAsArray(fieldName));
+                break;
+            case RECORD:
+                // In case of the underlying field is really a record (and not 
a map for example), schema argument is not used
+                item.withMap(fieldName, 
getRecordFieldAsMap(record.getAsRecord(fieldName, null)));
+                break;
+            case MAP:
+                item.withMap(fieldName, 
getMapFieldAsMap(record.getValue(fieldName)));
+                break;
+            case CHOICE: // No similar data type is supported by DynamoDB
+            default:
+                item.withString(fieldName, record.getAsString(fieldName));
+        }
+    }
+
+    private static Map<String, Object> getRecordFieldAsMap(final Record 
recordField) {
+        final Map<String, Object> result = new HashMap<>();
+
+        for (final RecordField field : recordField.getSchema().getFields()) {
+            result.put(field.getFieldName(), 
convertToSupportedType(recordField.getValue(field)));
+        }
+
+        return result;
+    }
+
+    private static Map<String, Object> getMapFieldAsMap(final Object 
recordField) {
+        if (!(recordField instanceof Map)) {
+            throw new IllegalArgumentException("Map type is expected");
+        }
+
+        final Map<String, Object> result = new HashMap<>();
+        ((Map<String, Object>) recordField).forEach((name, value) -> 
result.put(name, convertToSupportedType(value)));
+        return result;
+    }
+
+    private static Object convertToSupportedType(Object value) {
+        if (value instanceof Record) {
+            return getRecordFieldAsMap((Record) value);
+        } else if (value instanceof Map) {
+            return getMapFieldAsMap(value);
+        } else if (value instanceof Character || value instanceof Timestamp || 
value instanceof Date || value instanceof Time) {
+            return ((Character) value).toString();
+        } else if (value instanceof Enum) {
+            return ((Enum) value).name();
+        } else {
+            return value;
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9064187..a02ced4 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -25,6 +25,7 @@ org.apache.nifi.processors.aws.lambda.PutLambda
 org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
 org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
 org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
+org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord
 org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
 org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
 org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
new file mode 100644
index 0000000..690864a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
@@ -0,0 +1,260 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert 
multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to 
process data based other than JSON format too and prepared to add multiple 
fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of 
data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with 
the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting 
the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure 
within Items. In some cases this representation might cause issues with the 
accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are 
converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the 
given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual 
wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handled as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit 
limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the 
number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might 
attempt multiple
+        insert calls towards the database server. Using this approach, the 
flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an 
unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it 
will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the 
third contains the remaining 20. In some cases it might occur that the first 
two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we 
will transfer it to the "failure" or "unprocessed" Relationship according to 
the nature of the issue.
+        In order to keep the information about the successfully processed 
chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute 
to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other 
limitation the inserts have with DynamoDB: the database has a build in 
supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to 
process the insert request until a certain amount of time. More information on 
this might be find <a 
href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html";>here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these 
cases as temporary issues and the FlowFile will be transferred to the 
"unprocessed" Relationship after which the processor will yield in order to 
avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" 
Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the 
<i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that 
relationship considered as healthy ones might be successfully processed in a 
later point.
+        It is possible that the FlowFile contains such a high number of 
records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the 
attempts, which means, after each trigger it will contain the sum number of 
inserted chunks making it possible for the later attempts to continue from the 
right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key 
and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record fields as partition key. The 
name of the record field is specified by the "Partition Key Field" property and 
the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition 
key. With this strategy all the Items within a FlowFile will share the same 
partition key value and it is suggested to use for tables also having a sort 
key in order to meet the primary key requirements of the DynamoDB.
+        The property "Partition Key Field" defines the name of the Item field 
and the property "Partition Key Attribute" will specify which attribute's value 
will be assigned to the partition key.
+        With this strategy the "Partition Key Field" must be different from 
the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generated UUID</h4>
+
+    <p>
+        By using this strategy the processor will generate a UUID identifier 
for every single Item. This identifier will be used as value for the partition 
key.
+        The name of the field used as partition key is defined by the property 
"Partition Key Field".
+        With this strategy the "Partition Key Field" must be different from 
the fields consisted by the incoming records.
+        When using this strategy, the partition key in the DynamoDB table must 
have String data type.
+    </p>
+
+    <h3>Sort Key Strategies</h3>
+
+    <h4>None</h4>
+
+    <p>
+        No sort key will be assigned to the Item. In case of the table 
definition expects it, using this strategy will result unsuccessful inserts.
+    </p>
+
+    <h4>Sort By Field</h4>
+
+    <p>
+        The processors assigns one of the record fields as sort key. The name 
of the record field is specified by the "Sort Key Field" property and the value 
will be the value of the record field with the same name.
+        With this strategy the "Sort Key Field" must be different from the 
fields consisted by the incoming records.
+    </p>
+
+    <h4>Generate Sequence</h4>
+
+    <p>
+        The processor assigns a generated value to every Item based on the 
original record's position in the incoming FlowFile (regardless of the chunks).
+        The first Item will have the sort key 1, the second will have sort key 
2 and so on. The generated keys are unique within a given FlowFile.
+        The name of the record field is specified by the "Sort Key Field" 
attribute.
+        With this strategy the "Sort Key Field" must be different from the 
fields consisted by the incoming records.
+        When using this strategy, the sort key in the DynamoDB table must have 
Number data type.
+
+    </p>
+
+    <h2>Examples</h2>
+
+    <h3>Using fields as partition and sort key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Partition By Field</li>
+        <li>Partition Key Field: class</li>
+        <li>Sort Key Strategy: Sort By Field</li>
+        <li>Sort Key Field: size</li>
+    </ul>
+
+    <p>
+        Note: both fields have to exist in the incoming records!
+    </p>
+
+    <h4>Result</h4>
+
+    <p>
+        Using this pair of strategies will result Items identical to the 
incoming record (not counting the representational changes from the conversion).
+        The field specified by the properties are added to the Items normally 
with the only difference of flagged as (primary) key items.
+    </p>
+
+    <h4>Input</h4>
+
+    <code>
+[{"type": "A", "subtype": 4, "class" : "t", "size": 1}]
+    </code>
+
+    <h4>Output (stylized)</h4>
+
+    <ul>
+        <li>type: String field with value "A"</li>
+        <li>subtype: Number field with value 4</li>
+        <li>class: String field with value "t" and serving as partition 
key</li>
+        <li>size: Number field with value 1 and serving as sort key</li>
+    </ul>
+
+    <h3>Using FlowFile filename as partition key with generated sort key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Partition By Attribute</li>
+        <li>Partition Key Field: source</li>
+        <li>Partition Key Attribute: filename</li>
+        <li>Sort Key Strategy: Generate Sequence</li>
+        <li>Sort Key Field: sort</li>
+    </ul>
+
+    <h4>Result</h4>
+
+    <p>
+        The FlowFile's filename attribute will be used as partition key. In 
this case all the records within the same FlowFile will share the same 
partition key.
+        In order to avoid collusion, if FlowFiles contain multiple records, 
using sort key is suggested.
+        In this case a generated sequence is used which is guaranteed to be 
unique within a given FlowFile.
+    </p>
+
+    <h4>Input</h4>
+
+    <code>
+[
+    {"type": "A", "subtype": 4, "class" : "t", "size": 1},
+    {"type": "B", "subtype": 5, "class" : "m", "size": 2}
+]
+    </code>
+
+    <h4>Output (stylized)</h4>
+
+    <h5>First Item</h5>
+
+    <ul>
+        <li>source: String field with value "data46362.json" and serving as 
partition key</li>
+        <li>type: String field with value "A"</li>
+        <li>subtype: Number field with value 4</li>
+        <li>class: String field with value "t"</li>
+        <li>size: Number field with value 1</li>
+        <li>sort: Number field with value 1 and serving as sort key</li>
+    </ul>
+
+    <h5>Second Item</h5>
+
+    <ul>
+        <li>source: String field with value "data46362.json" and serving as 
partition key</li>
+        <li>type: String field with value "B"</li>
+        <li>subtype: Number field with value 5</li>
+        <li>class: String field with value "m"</li>
+        <li>size: Number field with value 2</li>
+        <li>sort: Number field with value 2 and serving as sort key</li>
+    </ul>
+
+    <h3>Using generated partition key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Generated UUID</li>
+        <li>Partition Key Field: identifier</li>
+        <li>Sort Key Strategy: None</li>
+    </ul>
+
+    <h4>Result</h4>
+
+    <p>
+        A generated UUID will be used as partition key. A different UUID will 
be generated for every Item.
+    </p>
+
+    <h4>Input</h4>
+
+    <code>
+        [{"type": "A", "subtype": 4, "class" : "t", "size": 1}]
+    </code>
+
+    <h4>Output (stylized)</h4>
+
+    <ul>
+        <li>identifier: String field with value 
"872ab776-ed73-4d37-a04a-807f0297e06e" and serving as partition key</li>
+        <li>type: String field with value "A"</li>
+        <li>subtype: Number field with value 4</li>
+        <li>class: String field with value "t"</li>
+        <li>size: Number field with value 1</li>
+    </ul>
+</body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
new file mode 100644
index 0000000..933a8d2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecordTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import org.apache.nifi.json.JsonTreeReader;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.mockito.stubbing.Answer;
+
+import java.io.FileInputStream;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class PutDynamoDBRecordTest {
+    private static final JsonTreeReader RECORD_READER = new JsonTreeReader();
+    private static final String TABLE_NAME = "table";
+
+    @Mock
+    private DynamoDB mockDynamoDB;
+
+    @Mock
+    private AWSCredentialsProviderService credentialsProviderService;
+
+    private ArgumentCaptor<TableWriteItems> captor;
+
+    private PutDynamoDBRecord testSubject;
+
+    @BeforeEach
+    public void setUp() {
+        captor = ArgumentCaptor.forClass(TableWriteItems.class);
+        
Mockito.when(credentialsProviderService.getIdentifier()).thenReturn("credentialProviderService");
+
+        final BatchWriteItemOutcome outcome = 
Mockito.mock(BatchWriteItemOutcome.class);
+        final Map<String, List<WriteRequest>> unprocessedItems = new 
HashMap<>();
+        
Mockito.when(outcome.getUnprocessedItems()).thenReturn(unprocessedItems);
+        
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenReturn(outcome);
+
+        testSubject = new PutDynamoDBRecord() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+    }
+
+    @Test
+    public void testEmptyInput() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.run();
+
+        Assertions.assertTrue(captor.getAllValues().isEmpty());
+    }
+
+    @Test
+    public void testSingleInput() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/singleInput.json"));
+        runner.run();
+
+        final TableWriteItems result = captor.getValue();
+        Assertions.assertEquals(TABLE_NAME, result.getTableName());
+        assertItemsConvertedProperly(result.getItemsToPut(), 1);
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testMultipleInputs() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleInputs.json"));
+        runner.run();
+
+        final TableWriteItems result = captor.getValue();
+        Assertions.assertEquals(TABLE_NAME, result.getTableName());
+        assertItemsConvertedProperly(result.getItemsToPut(), 3);
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testMultipleChunks() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleChunks.json"));
+        runner.run();
+
+        final List<TableWriteItems> results = captor.getAllValues();
+        Assertions.assertEquals(2, results.size());
+
+        final TableWriteItems result1 = results.get(0);
+        Assertions.assertEquals(TABLE_NAME, result1.getTableName());
+        assertItemsConvertedProperly(result1.getItemsToPut(), 25);
+
+        final TableWriteItems result2 = results.get(1);
+        Assertions.assertEquals(TABLE_NAME, result2.getTableName());
+        assertItemsConvertedProperly(result2.getItemsToPut(), 4);
+
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
+        Assertions.assertEquals("2", 
flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
+    }
+
+    @Test
+    public void testThroughputIssue() throws Exception {
+        final TestRunner runner = getTestRunner();
+        setExceedThroughputAtGivenChunk(2);
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleChunks.json"));
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_UNPROCESSED, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_UNPROCESSED).get(0);
+        Assertions.assertEquals("1", 
flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
+    }
+
+    @Test
+    public void testRetryAfterUnprocessed() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleChunks.json"), 
Collections.singletonMap(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, 
"1"));
+        runner.run();
+
+        Assertions.assertEquals(1, captor.getAllValues().size());
+        Assertions.assertEquals(4, captor.getValue().getItemsToPut().size());
+
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_SUCCESS).get(0);
+        Assertions.assertEquals("2", 
flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
+    }
+
+    @Test
+    public void testErrorDuringInsertion() throws Exception {
+        final TestRunner runner = getTestRunner();
+        setServerError();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleInputs.json"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDynamoDBRecord.REL_FAILURE).get(0);
+        Assertions.assertEquals("0", 
flowFile.getAttribute(PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGeneratedPartitionKey() throws Exception {
+        final TestRunner runner = getTestRunner();
+        runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_STRATEGY, 
PutDynamoDBRecord.PARTITION_GENERATED);
+        runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_FIELD, "generated");
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/singleInput.json"));
+        runner.run();
+
+        final TableWriteItems result = captor.getValue();
+        Assertions.assertEquals(1, result.getItemsToPut().size());
+
+        final Item item = result.getItemsToPut().iterator().next();
+        Assertions.assertEquals(4, item.asMap().size());
+        Assertions.assertEquals("P0", item.get("partition"));
+        Assertions.assertTrue(item.hasAttribute("generated"));
+
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testGeneratedSortKey() throws Exception {
+        final TestRunner runner = getTestRunner();
+        runner.setProperty(PutDynamoDBRecord.SORT_KEY_STRATEGY, 
PutDynamoDBRecord.SORT_BY_SEQUENCE);
+        runner.setProperty(PutDynamoDBRecord.SORT_KEY_FIELD, "sort");
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleChunks.json"));
+        runner.run();
+
+        final List<Item> items = new ArrayList<>();
+        captor.getAllValues().forEach(capture -> 
items.addAll(capture.getItemsToPut()));
+
+        Assertions.assertEquals(29, items.size());
+
+        for (int sortKeyValue = 0; sortKeyValue < 29; sortKeyValue++) {
+            Assertions.assertEquals(new BigDecimal(sortKeyValue + 1), 
items.get(sortKeyValue).get("sort"));
+        }
+    }
+
+    @Test
+    public void testPartitionFieldIsMissing() throws Exception {
+        final TestRunner runner = getTestRunner();
+        runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_FIELD, 
"unknownField");
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/singleInput.json"));
+        runner.run();
+
+        Mockito.verify(mockDynamoDB, 
Mockito.never()).batchWriteItem(Mockito.any(TableWriteItems.class));
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPartiallySuccessfulInsert() throws Exception {
+        final TestRunner runner = getTestRunner();
+        setInsertionError();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/multipleInputs.json"));
+        runner.run();
+
+        Mockito.verify(mockDynamoDB, 
Mockito.times(1)).batchWriteItem(Mockito.any(TableWriteItems.class));
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testNonRecordOrientedInput() throws Exception {
+        final TestRunner runner = getTestRunner();
+
+        runner.enqueue(new 
FileInputStream("src/test/resources/dynamodb/nonRecordOriented.txt"));
+        runner.run();
+
+
+        Assertions.assertTrue(captor.getAllValues().isEmpty());
+        runner.assertAllFlowFilesTransferred(PutDynamoDBRecord.REL_FAILURE, 1);
+    }
+
+    private TestRunner getTestRunner() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(testSubject);
+
+        runner.addControllerService("recordReader", RECORD_READER);
+        runner.addControllerService("credentialProviderService", 
credentialsProviderService);
+
+        runner.enableControllerService(RECORD_READER);
+        runner.enableControllerService(credentialsProviderService);
+
+        runner.setProperty(PutDynamoDBRecord.RECORD_READER, "recordReader");
+        runner.setProperty(PutDynamoDBRecord.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"credentialProviderService");
+        runner.setProperty(PutDynamoDBRecord.TABLE, TABLE_NAME);
+        runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_STRATEGY, 
PutDynamoDBRecord.PARTITION_BY_FIELD);
+        runner.setProperty(PutDynamoDBRecord.PARTITION_KEY_FIELD, "partition");
+        runner.setProperty(PutDynamoDBRecord.SORT_KEY_FIELD, 
PutDynamoDBRecord.SORT_NONE);
+        return runner;
+    }
+
+    private void assertItemsConvertedProperly(final Collection<Item> items, 
final int expectedNumberOfItems) {
+        Assertions.assertEquals(expectedNumberOfItems, items.size());
+        int index = 0;
+
+        for (final Item item : items) {
+            Assertions.assertEquals(3, item.asMap().size());
+            Assertions.assertEquals("new", item.get("value"));
+
+            Assertions.assertEquals(new BigDecimal(index), item.get("size"));
+            Assertions.assertEquals("P" + index, item.get("partition"));
+            index++;
+        }
+    }
+
+    private void setInsertionError() {
+        final BatchWriteItemOutcome outcome = 
Mockito.mock(BatchWriteItemOutcome.class);
+        final Map<String, List<WriteRequest>> unprocessedItems = new 
HashMap<>();
+        final List<WriteRequest> writeResults = 
Arrays.asList(Mockito.mock(WriteRequest.class));
+        unprocessedItems.put("test", writeResults);
+        
Mockito.when(outcome.getUnprocessedItems()).thenReturn(unprocessedItems);
+        
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenReturn(outcome);
+    }
+
+    private void setServerError() {
+        
Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).thenThrow(new 
AmazonServiceException("Error"));
+    }
+
+    private void setExceedThroughputAtGivenChunk(final int chunkToFail) {
+        final AtomicInteger numberOfCalls = new AtomicInteger(0);
+
+        Mockito.when(mockDynamoDB.batchWriteItem(captor.capture())).then(new 
Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                final int calls = numberOfCalls.incrementAndGet();
+
+                if (calls >= chunkToFail) {
+                    throw new 
ProvisionedThroughputExceededException("Throughput exceeded");
+                } else {
+                    return Mockito.mock(BatchWriteItemOutcome.class);
+                }
+            }
+        });
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverterTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverterTest.java
new file mode 100644
index 0000000..57aeab7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverterTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
+import org.apache.nifi.action.Component;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class RecordToItemConverterTest {
+
+    @Test
+    public void testConvertingSimpleFields() {
+        final List<RecordField> schemaFields = new ArrayList<>();
+        schemaFields.add(new RecordField("boolean", 
RecordFieldType.BOOLEAN.getDataType()));
+        schemaFields.add(new RecordField("short", 
RecordFieldType.SHORT.getDataType()));
+        schemaFields.add(new RecordField("int", 
RecordFieldType.INT.getDataType()));
+        schemaFields.add(new RecordField("long", 
RecordFieldType.LONG.getDataType()));
+        schemaFields.add(new RecordField("byte", 
RecordFieldType.BYTE.getDataType()));
+        schemaFields.add(new RecordField("float", 
RecordFieldType.FLOAT.getDataType()));
+        schemaFields.add(new RecordField("double", 
RecordFieldType.DOUBLE.getDataType()));
+        schemaFields.add(new RecordField("bigint", 
RecordFieldType.BIGINT.getDataType()));
+        schemaFields.add(new RecordField("decimal", 
RecordFieldType.DECIMAL.getDataType()));
+        schemaFields.add(new RecordField("timestamp", 
RecordFieldType.TIMESTAMP.getDataType()));
+        schemaFields.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
+        schemaFields.add(new RecordField("time", 
RecordFieldType.TIME.getDataType()));
+        schemaFields.add(new RecordField("char", 
RecordFieldType.CHAR.getDataType()));
+        schemaFields.add(new RecordField("enum", 
RecordFieldType.ENUM.getDataType()));
+        schemaFields.add(new RecordField("array", 
RecordFieldType.ARRAY.getDataType()));
+        schemaFields.add(new RecordField("choice", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.BOOLEAN.getDataType(), 
RecordFieldType.INT.getDataType())));
+        final RecordSchema schema = new SimpleRecordSchema(schemaFields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("boolean", Boolean.TRUE);
+        values.put("short", Short.MAX_VALUE);
+        values.put("int", Integer.MAX_VALUE);
+        values.put("long", Long.MAX_VALUE);
+        values.put("byte", Byte.MAX_VALUE);
+        values.put("float", Float.valueOf(123.456F));
+        values.put("double", Double.valueOf(1234.5678D));
+        values.put("bigint", BigInteger.TEN);
+        values.put("decimal", new 
BigDecimal("12345678901234567890.123456789012345678901234567890"));
+        values.put("timestamp", new java.sql.Timestamp(37293723L));
+        values.put("date", java.sql.Date.valueOf("1970-01-01"));
+        values.put("time", new java.sql.Time(37293723L));
+        values.put("char", 'c');
+        values.put("enum", Component.Controller);
+        values.put("array", new Integer[] {0,1,10});
+        values.put("choice", Integer.MAX_VALUE);
+        final Record record = new MapRecord(schema, values);
+
+        final Item item = new Item();
+
+        for (final RecordField schemaField : schema.getFields()) {
+            RecordToItemConverter.addField(record, item, 
schemaField.getDataType().getFieldType(), schemaField.getFieldName());
+        }
+
+        Assertions.assertEquals(Boolean.TRUE, item.get("boolean"));
+
+        // Internally Item stores numbers as BigDecimal
+        Assertions.assertEquals(BigDecimal.valueOf(Short.MAX_VALUE), 
item.get("short"));
+        Assertions.assertEquals(BigDecimal.valueOf(Integer.MAX_VALUE), 
item.get("int"));
+        Assertions.assertEquals(BigDecimal.valueOf(Long.MAX_VALUE), 
item.get("long"));
+        Assertions.assertEquals(BigDecimal.valueOf(Byte.MAX_VALUE), 
item.get("byte"));
+        Assertions.assertEquals(new 
BigDecimal("12345678901234567890.123456789012345678901234567890"), 
item.get("decimal"));
+        Assertions.assertEquals(Float.valueOf(123.456F), ((BigDecimal) 
item.get("float")).floatValue(), 0.0001);
+        Assertions.assertEquals(Double.valueOf(1234.5678D), ((BigDecimal) 
item.get("double")).floatValue(), 0.0001);
+
+        Assertions.assertEquals(BigDecimal.valueOf(10), item.get("bigint"));
+
+        // DynamoDB uses string to represent time and date
+        Assertions.assertTrue(item.get("timestamp") instanceof String);
+        Assertions.assertTrue(item.get("date") instanceof String);
+        Assertions.assertTrue(item.get("time") instanceof String);
+
+        // Character is unknown type for DynamoDB, as well as enum
+        Assertions.assertEquals("c", item.get("char"));
+
+        // Enum is not supported in DynamoDB
+        Assertions.assertEquals(Component.Controller.name(), item.get("enum"));
+
+        // DynamoDB uses lists and still keeps the payload datatype
+        Assertions.assertIterableEquals(Arrays.asList(new BigDecimal[] 
{BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.TEN}), (Iterable<?>) 
item.get("array"));
+
+        // DynamoDB cannot handle choice, all values enveloped into choice are 
handled as strings
+        Assertions.assertEquals("2147483647", item.get("choice"));
+    }
+
+    @Test
+    public void testConvertingMapField() {
+        final List<RecordField> starSystemSchemaFields = new ArrayList<>();
+        starSystemSchemaFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        starSystemSchemaFields.add(new RecordField("observedPlanets", 
RecordFieldType.INT.getDataType()));
+        starSystemSchemaFields.add(new RecordField("star", 
RecordFieldType.MAP.getDataType()));
+        final RecordSchema starSystemSchema = new 
SimpleRecordSchema(starSystemSchemaFields);
+
+        final Map<String, Object> star = new HashMap<>();
+        star.put("type", 'G');
+        star.put("isDwarf", false);
+
+        final Map<String, Object> starSystemValues = new HashMap<>();
+        starSystemValues.put("name", "Tau Ceti");
+        starSystemValues.put("observedPlanets", "5");
+        starSystemValues.put("star", star);
+        final Record starSystem = new MapRecord(starSystemSchema, 
starSystemValues);
+
+        final Item item = new Item();
+
+        RecordToItemConverter.addField(starSystem, item, RecordFieldType.MAP, 
"star");
+
+        final Object result = item.get("star");
+        Assertions.assertTrue(result instanceof Map);
+        final Map<String, Object> resultMap = (Map<String, Object>) result;
+        Assertions.assertEquals(2, resultMap.size());
+        Assertions.assertEquals(false, resultMap.get("isDwarf"));
+        Assertions.assertEquals("G", resultMap.get("type"));
+    }
+
+    @Test
+    public void testConvertingMultipleLevelsOfMaps() {
+        final List<RecordField> starSystemSchemaFields = new ArrayList<>();
+        starSystemSchemaFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        starSystemSchemaFields.add(new RecordField("observedPlanets", 
RecordFieldType.INT.getDataType()));
+        starSystemSchemaFields.add(new RecordField("star", 
RecordFieldType.MAP.getDataType()));
+        final RecordSchema starSystemSchema = new 
SimpleRecordSchema(starSystemSchemaFields);
+
+        final Map<String, Object> starType = new HashMap<>();
+        starType.put("type", 'G');
+        starType.put("isDwarf", false);
+
+        final Map<String, Object> star = new HashMap<>();
+        star.put("starType", starType);
+
+        final Map<String, Object> starSystemValues = new HashMap<>();
+        starSystemValues.put("name", "Tau Ceti");
+        starSystemValues.put("observedPlanets", "5");
+        starSystemValues.put("star", star);
+        final Record starSystem = new MapRecord(starSystemSchema, 
starSystemValues);
+
+        final Item item = new Item();
+
+        RecordToItemConverter.addField(starSystem, item, RecordFieldType.MAP, 
"star");
+
+        final Object result = item.get("star");
+        Assertions.assertTrue(result instanceof Map);
+        final Map<String, Object> resultMap = (Map<String, Object>) result;
+        Assertions.assertEquals(1, resultMap.size());
+        final Object starTypeResult = resultMap.get("starType");
+        Assertions.assertTrue(starTypeResult instanceof Map);
+        final Map<String, Object> starTypeResultMap = (Map<String, Object>) 
starTypeResult;
+        Assertions.assertEquals(false, starTypeResultMap.get("isDwarf"));
+        Assertions.assertEquals("G", starTypeResultMap.get("type"));
+    }
+
+    @Test
+    public void testConvertingRecordField() {
+        final List<RecordField> starSchemaFields = new ArrayList<>();
+        starSchemaFields.add(new RecordField("type", 
RecordFieldType.CHAR.getDataType()));
+        starSchemaFields.add(new RecordField("isDwarf", 
RecordFieldType.BOOLEAN.getDataType()));
+        final RecordSchema starSchema = new 
SimpleRecordSchema(starSchemaFields);
+
+        final List<RecordField> starSystemSchemaFields = new ArrayList<>();
+        starSystemSchemaFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        starSystemSchemaFields.add(new RecordField("observedPlanets", 
RecordFieldType.INT.getDataType()));
+        starSystemSchemaFields.add(new RecordField("star", 
RecordFieldType.RECORD.getRecordDataType(starSchema)));
+        final RecordSchema starSystemSchema = new 
SimpleRecordSchema(starSystemSchemaFields);
+
+        final Map<String, Object> starValues = new HashMap<>();
+        starValues.put("type", 'G');
+        starValues.put("isDwarf", false);
+        final Record star = new MapRecord(starSchema, starValues);
+
+        final Map<String, Object> starSystemValues = new HashMap<>();
+        starSystemValues.put("name", "Tau Ceti");
+        starSystemValues.put("observedPlanets", 5);
+        starSystemValues.put("star", star);
+        final Record starSystem = new MapRecord(starSystemSchema, 
starSystemValues);
+
+        final Item item = new Item();
+
+        RecordToItemConverter.addField(starSystem, item, 
RecordFieldType.RECORD, "star");
+
+        final Object result = item.get("star");
+        Assertions.assertTrue(result instanceof Map);
+        final Map<String, Object> resultMap = (Map<String, Object>) result;
+        Assertions.assertEquals(2, resultMap.size());
+        Assertions.assertEquals(false, resultMap.get("isDwarf"));
+        Assertions.assertEquals("G", resultMap.get("type"));
+    }
+
+    @Test
+    public void testConvertingMultipleLevelsOfRecords() {
+        final List<RecordField> starTypeSchemaFields = new ArrayList<>();
+        starTypeSchemaFields.add(new RecordField("type", 
RecordFieldType.CHAR.getDataType()));
+        starTypeSchemaFields.add(new RecordField("isDwarf", 
RecordFieldType.BOOLEAN.getDataType()));
+        final RecordSchema starTypeSchema = new 
SimpleRecordSchema(starTypeSchemaFields);
+
+        final List<RecordField> starSchemaFields = new ArrayList<>();
+        starSchemaFields.add(new RecordField("starType", 
RecordFieldType.RECORD.getRecordDataType(starTypeSchema)));
+        final RecordSchema starSchema = new 
SimpleRecordSchema(starSchemaFields);
+
+        final List<RecordField> starSystemSchemaFields = new ArrayList<>();
+        starSystemSchemaFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        starSystemSchemaFields.add(new RecordField("observedPlanets", 
RecordFieldType.INT.getDataType()));
+        starSystemSchemaFields.add(new RecordField("star", 
RecordFieldType.RECORD.getRecordDataType(starSchema)));
+        final RecordSchema starSystemSchema = new 
SimpleRecordSchema(starSystemSchemaFields);
+
+        final Map<String, Object> starTypeValues = new HashMap<>();
+        starTypeValues.put("type", 'G');
+        starTypeValues.put("isDwarf", false);
+        final Record starType = new MapRecord(starTypeSchema, starTypeValues);
+
+        final Map<String, Object> starValues = new HashMap<>();
+        starValues.put("starType", starType);
+        final Record star = new MapRecord(starSchema, starValues);
+
+        final Map<String, Object> starSystemValues = new HashMap<>();
+        starSystemValues.put("name", "Tau Ceti");
+        starSystemValues.put("observedPlanets", 5);
+        starSystemValues.put("star", star);
+        final Record starSystem = new MapRecord(starSystemSchema, 
starSystemValues);
+
+        final Item item = new Item();
+
+        RecordToItemConverter.addField(starSystem, item, 
RecordFieldType.RECORD, "star");
+
+        final Object result = item.get("star");
+        Assertions.assertTrue(result instanceof Map);
+        final Map<String, Object> resultMap = (Map<String, Object>) result;
+        Assertions.assertEquals(1, resultMap.size());
+        final Object fieldResult = resultMap.get("starType");
+        Assertions.assertTrue(fieldResult instanceof Map);
+        final Map<String, Object> fieldResultMap = (Map<String, Object>) 
fieldResult;
+        Assertions.assertEquals(false, fieldResultMap.get("isDwarf"));
+        Assertions.assertEquals("G", fieldResultMap.get("type"));
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleChunks.json
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleChunks.json
new file mode 100644
index 0000000..bb64373
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleChunks.json
@@ -0,0 +1,31 @@
+[
+  {"value": "new", "size": 0, "partition" : "P0"},
+  {"value": "new", "size": 1, "partition" : "P1"},
+  {"value": "new", "size": 2, "partition" : "P2"},
+  {"value": "new", "size": 3, "partition" : "P3"},
+  {"value": "new", "size": 4, "partition" : "P4"},
+  {"value": "new", "size": 5, "partition" : "P5"},
+  {"value": "new", "size": 6, "partition" : "P6"},
+  {"value": "new", "size": 7, "partition" : "P7"},
+  {"value": "new", "size": 8, "partition" : "P8"},
+  {"value": "new", "size": 9, "partition" : "P9"},
+  {"value": "new", "size": 10, "partition" : "P10"},
+  {"value": "new", "size": 11, "partition" : "P11"},
+  {"value": "new", "size": 12, "partition" : "P12"},
+  {"value": "new", "size": 13, "partition" : "P13"},
+  {"value": "new", "size": 14, "partition" : "P14"},
+  {"value": "new", "size": 15, "partition" : "P15"},
+  {"value": "new", "size": 16, "partition" : "P16"},
+  {"value": "new", "size": 17, "partition" : "P17"},
+  {"value": "new", "size": 18, "partition" : "P18"},
+  {"value": "new", "size": 19, "partition" : "P19"},
+  {"value": "new", "size": 20, "partition" : "P20"},
+  {"value": "new", "size": 21, "partition" : "P21"},
+  {"value": "new", "size": 22, "partition" : "P22"},
+  {"value": "new", "size": 23, "partition" : "P23"},
+  {"value": "new", "size": 24, "partition" : "P24"},
+  {"value": "new", "size": 0, "partition" : "P0"},
+  {"value": "new", "size": 1, "partition" : "P1"},
+  {"value": "new", "size": 2, "partition" : "P2"},
+  {"value": "new", "size": 3, "partition" : "P3"}
+]
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleInputs.json
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleInputs.json
new file mode 100644
index 0000000..4e484fb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/multipleInputs.json
@@ -0,0 +1,5 @@
+[
+  {"value": "new", "size": 0, "partition" : "P0"},
+  {"value": "new", "size": 1, "partition" : "P1"},
+  {"value": "new", "size": 2, "partition" : "P2"}
+]
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/nonRecordOriented.txt
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/nonRecordOriented.txt
new file mode 100644
index 0000000..946689f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/nonRecordOriented.txt
@@ -0,0 +1 @@
+loremIpsum
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/singleInput.json
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/singleInput.json
new file mode 100644
index 0000000..9356093
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/resources/dynamodb/singleInput.json
@@ -0,0 +1 @@
+[{"value": "new", "size": 0, "partition" : "P0"}]
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandler.java
new file mode 100644
index 0000000..808c1a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+
+public abstract class SplitRecordSetHandler {
+    final int maximumChunkSize;
+
+    protected SplitRecordSetHandler(int maximumChunkSize) {
+        if (maximumChunkSize < 1) {
+            throw new IllegalArgumentException("The maximum chunk size must be 
a positive number");
+        }
+
+        this.maximumChunkSize = maximumChunkSize;
+    }
+
+    public final RecordHandlerResult handle(final RecordSet recordSet) throws 
Exception {
+        return handle(recordSet, 0);
+    }
+
+    public final RecordHandlerResult handle(final RecordSet recordSet, final 
int alreadyProcessedChunks) throws IOException {
+        Record record;
+        int currentChunkNumber = 0;
+        int currentChunkSize = 0;
+
+        while ((record = recordSet.next()) != null) {
+            addToChunk(record);
+            currentChunkSize++;
+
+            if (currentChunkSize == maximumChunkSize) {
+                try {
+                    handleChunk(alreadyProcessedChunks > currentChunkNumber);
+                    currentChunkNumber++;
+                    currentChunkSize = 0;
+                } catch (final SplitRecordSetHandlerException e) {
+                    return new RecordHandlerResult(currentChunkNumber, e);
+                }
+            }
+        }
+
+        // Handling the last, not fully filled chunk
+        if (currentChunkSize != 0) {
+            try {
+                handleChunk(alreadyProcessedChunks > currentChunkNumber);
+                currentChunkNumber++;
+            } catch (final SplitRecordSetHandlerException e) {
+                return new RecordHandlerResult(currentChunkNumber, e);
+            }
+        }
+
+        return new RecordHandlerResult(currentChunkNumber);
+    }
+
+    public static class RecordHandlerResult {
+        private final int successfulChunks;
+        private final Throwable throwable;
+
+        private RecordHandlerResult(final int successfulChunks, final 
Throwable throwable) {
+            this.successfulChunks = successfulChunks;
+            this.throwable = throwable;
+        }
+
+        private RecordHandlerResult(final int successfulChunks) {
+            this.successfulChunks = successfulChunks;
+            this.throwable = null;
+        }
+
+        public int getSuccessfulChunks() {
+            return successfulChunks;
+        }
+
+        public boolean isSuccess() {
+            return throwable == null;
+        }
+
+        public Throwable getThrowable() {
+            return throwable;
+        }
+    }
+
+    protected abstract void handleChunk(boolean wasBatchAlreadyProcessed) 
throws SplitRecordSetHandlerException;
+
+    protected abstract void addToChunk(Record record);
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandlerException.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandlerException.java
new file mode 100644
index 0000000..5800091
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandlerException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+public final class SplitRecordSetHandlerException extends Exception {
+
+    public SplitRecordSetHandlerException(final Throwable cause) {
+        super(cause);
+    }
+
+    public SplitRecordSetHandlerException(final String message) {
+        super(message);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/serialization/SplitRecordSetHandlerTest.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/serialization/SplitRecordSetHandlerTest.java
new file mode 100644
index 0000000..bfb235a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/serialization/SplitRecordSetHandlerTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SplitRecordSetHandlerTest {
+    private static final int MAXIMUM_CHUNK_SIZE = 3;
+    private static final Throwable CAUSE = new RuntimeException("Test");
+
+    private TestableSplitRecordSetHandler testSubject;
+    private SplitRecordSetHandler.RecordHandlerResult result;
+
+    @BeforeEach
+    public void setUp() {
+        testSubject = new TestableSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE);
+    }
+
+    @Test
+    public void testEmptyBatch() throws Exception {
+        executeHandlerWithRecordSet(0);
+
+        assertSuccessfulHandling(0);
+        assertChunkSizeIs(0);
+    }
+
+    @Test
+    public void testSuccessWhenSingleNonFullChunk() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(2);
+
+        assertSuccessfulHandling(1);
+        assertChunkSizeIs(2);
+    }
+
+    @Test
+    public void testFailureWhenSingleNonFullChunk() throws Exception {
+        setHandlerToFail();
+        executeHandlerWithRecordSet(2);
+
+        assertChunkSizeIs(2);
+        assertFailedHandling(0);
+        Assertions.assertEquals(1, testSubject.getHandleChunkCalls().size()); 
// Tried once
+    }
+
+    @Test
+    public void testSuccessWhenSingleFullChunk() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(MAXIMUM_CHUNK_SIZE);
+
+        assertSuccessfulHandling(1);
+        assertChunkSizeIs(MAXIMUM_CHUNK_SIZE);
+    }
+
+    @Test
+    public void testFailureWhenSingleFullChunk() throws Exception {
+        setHandlerToFail();
+        executeHandlerWithRecordSet(MAXIMUM_CHUNK_SIZE);
+
+        assertChunkSizeIs(MAXIMUM_CHUNK_SIZE);
+        assertFailedHandling(0);
+        Assertions.assertEquals(1, testSubject.getHandleChunkCalls().size()); 
// Tried once
+    }
+
+    @Test
+    public void testSuccessWhenMultipleChunks() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(5);
+
+        assertSuccessfulHandling(2);
+        assertChunkSizeIs(5);
+    }
+
+    @Test
+    public void testFailureWhenMultipleChunks() throws Exception {
+        setHandlerToSequence(true, false);
+        executeHandlerWithRecordSet(3 * MAXIMUM_CHUNK_SIZE - 1);
+
+        assertFailedHandling(1);
+        Assertions.assertEquals(2, testSubject.getHandleChunkCalls().size());
+    }
+
+    @Test
+    public void testSuccessWhenNumerousChunks() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(31);
+
+        assertSuccessfulHandling(11);
+        Assertions.assertEquals(31, testSubject.getChunks().size());
+    }
+
+    @Test
+    public void testStartAtLaterPoint() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(MAXIMUM_CHUNK_SIZE * 3 - 1, 2);
+
+        Assertions.assertTrue(result.isSuccess());
+        Assertions.assertNull(result.getThrowable());
+        Assertions.assertEquals(3, result.getSuccessfulChunks()); //
+        Assertions.assertIterableEquals(Arrays.asList(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE), testSubject.getHandleChunkCalls());
+        Assertions.assertEquals(MAXIMUM_CHUNK_SIZE * 3 - 1, 
testSubject.getChunks().size());
+    }
+
+    @Test
+    public void testStartAtAPointAfterLastChunk() throws Exception {
+        setHandlerToSuccess();
+        executeHandlerWithRecordSet(MAXIMUM_CHUNK_SIZE * 3 - 1, 9);
+
+        Assertions.assertTrue(result.isSuccess());
+        Assertions.assertNull(result.getThrowable());
+        Assertions.assertEquals(3, result.getSuccessfulChunks()); //
+        Assertions.assertIterableEquals(Arrays.asList(Boolean.TRUE, 
Boolean.TRUE, Boolean.TRUE), testSubject.getHandleChunkCalls());
+    }
+
+    @Test
+    public void testStartAtLaterPointAndFail() throws Exception {
+        setHandlerToSequence(true, true, false);
+        executeHandlerWithRecordSet(MAXIMUM_CHUNK_SIZE * 4 - 1, 1);
+
+        assertFailedHandling(2);
+        Assertions.assertIterableEquals(Arrays.asList(Boolean.TRUE, 
Boolean.FALSE, Boolean.FALSE), testSubject.getHandleChunkCalls());
+    }
+
+    private void executeHandlerWithRecordSet(final int recordSetSize) throws 
Exception {
+        result = testSubject.handle(getRecordSet(recordSetSize));
+    }
+
+    private void executeHandlerWithRecordSet(final int recordSetSize, final 
int alreadyProcessedChunks) throws Exception {
+        result = testSubject.handle(getRecordSet(recordSetSize), 
alreadyProcessedChunks);
+    }
+
+    private void assertSuccessfulHandling(final int 
expectedNumberOfSuccessfulChunks) {
+        Assertions.assertTrue(result.isSuccess());
+        Assertions.assertNull(result.getThrowable());
+        Assertions.assertEquals(expectedNumberOfSuccessfulChunks, 
result.getSuccessfulChunks());
+        final List<Boolean> expectedCalls = new ArrayList<>();
+
+        for (int i = 0; i < expectedNumberOfSuccessfulChunks; i++) {
+            expectedCalls.add(false);
+        }
+
+        Assertions.assertIterableEquals(expectedCalls, 
testSubject.getHandleChunkCalls());
+    }
+
+    private void assertFailedHandling(final int 
expectedNumberOfSuccessfulChunks) {
+        Assertions.assertFalse(result.isSuccess());
+        Assertions.assertEquals(CAUSE, result.getThrowable().getCause());
+        Assertions.assertEquals(expectedNumberOfSuccessfulChunks, 
result.getSuccessfulChunks());
+    }
+
+    private void assertChunkSizeIs(final int expectedSize) {
+        Assertions.assertEquals(expectedSize, testSubject.getChunks().size());
+    }
+
+    private void setHandlerToSuccess() {
+        testSubject.setUpHandleChunkResults(true);
+    }
+
+    private void setHandlerToFail() {
+        testSubject.setUpHandleChunkResults(false);
+    }
+
+    private void setHandlerToSequence(Boolean... sequence) {
+        testSubject.setUpHandleChunkResults(sequence);
+    }
+
+    private RecordSet getRecordSet(int size) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("payload", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final List<Record> records = new ArrayList<>();
+
+        for (int i = 1; i <= size; i++) {
+            final Map<String, Object> values = new HashMap<>();
+            values.put("id", 1);
+            values.put("payload", "value");
+            final Record record = new MapRecord(schema, values);
+            records.add(record);
+        }
+
+        final RecordSet recordSet = new ListRecordSet(schema, records);
+        return recordSet;
+    }
+
+    private static class TestableSplitRecordSetHandler extends 
SplitRecordSetHandler {
+        private final List<Record> chunks = new ArrayList<>();
+        private final List<Boolean> handleChunkCalls = new ArrayList<>();
+        private List<Boolean> handleChunkResults;
+
+        protected TestableSplitRecordSetHandler(final int maximumChunkSize) {
+            super(maximumChunkSize);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) 
throws SplitRecordSetHandlerException {
+            handleChunkCalls.add(wasBatchAlreadyProcessed);
+            final int index = Math.min(handleChunkCalls.size() - 1, 
handleChunkResults.size() - 1);
+
+            if (!handleChunkResults.get(index)) {
+                throw new SplitRecordSetHandlerException(CAUSE);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            chunks.add(record);
+        }
+
+        List<Record> getChunks() {
+            return chunks;
+        }
+
+        List<Boolean> getHandleChunkCalls() {
+            return handleChunkCalls;
+        }
+
+        void setUpHandleChunkResults(final Boolean... results) {
+            handleChunkResults = Arrays.asList(results);
+        }
+    }
+}
\ No newline at end of file

Reply via email to