turcsanyip commented on a change in pull request #5761:
URL: https://github.com/apache/nifi/pull/5761#discussion_r812022606



##########
File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
##########
@@ -257,7 +257,7 @@ private void enableReader() {
     }
 
     private void triggerProcessor(final String input) throws 
FileNotFoundException {
-        runner.enqueue(new FileInputStream(input));
+            runner.enqueue(new FileInputStream(input));

Review comment:
       Please revert and remove this accidental change.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),

Review comment:
       I would use `DynamoDB`, the same way AWS uses its name.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key to the inserted Items. Partition key is also known as hash key.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-field")
+            .displayName("Partition Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The name of the Item field will be used as partition 
key. Depending on the \"Partition Strategy\" this might be a field from the 
incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-attribute")
+            .displayName("Partition Attribute")
+            .required(true)
+            .dependsOn(PARTITION_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute will be used as the 
value of the partition key when using \"Partition by attribute\" partition 
strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items. Sort key is also known as range key.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies which field will be used as sort key when 
the sort key strategy is set to \"Sort by field\".")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(
+                RECORD_READER,
+                new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+                TABLE,
+                PARTITION_STRATEGY,
+                PARTITION_FIELD,
+                PARTITION_ATTRIBUTE,
+                SORT_KEY_STRATEGY,
+                SORT_KEY_FIELD,
+                REGION,
+                TIMEOUT,
+                PROXY_HOST,
+                PROXY_HOST_PORT,
+                PROXY_USERNAME,
+                PROXY_PASSWORD,

Review comment:
       `ProxyConfigurationService` should be used in new components instead of 
the proxy properties.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key to the inserted Items. Partition key is also known as hash key.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-field")
+            .displayName("Partition Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The name of the Item field will be used as partition 
key. Depending on the \"Partition Strategy\" this might be a field from the 
incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-attribute")
+            .displayName("Partition Attribute")
+            .required(true)
+            .dependsOn(PARTITION_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute will be used as the 
value of the partition key when using \"Partition by attribute\" partition 
strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items. Sort key is also known as range key.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies which field will be used as sort key when 
the sort key strategy is set to \"Sort by field\".")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(
+                RECORD_READER,
+                new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),

Review comment:
       It is an old typo but could you please fix `service` -> `Service` in the 
name of the property.
   I think we can do it for all processors, not only here. 

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")

Review comment:
       In my opinion `Partition Key *` would be better names, similar to `Sort 
Key *`.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key to the inserted Items. Partition key is also known as hash key.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-field")
+            .displayName("Partition Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The name of the Item field will be used as partition 
key. Depending on the \"Partition Strategy\" this might be a field from the 
incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-attribute")
+            .displayName("Partition Attribute")
+            .required(true)
+            .dependsOn(PARTITION_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute will be used as the 
value of the partition key when using \"Partition by attribute\" partition 
strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items. Sort key is also known as range key.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies which field will be used as sort key when 
the sort key strategy is set to \"Sort by field\".")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(
+                RECORD_READER,
+                new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+                TABLE,
+                PARTITION_STRATEGY,
+                PARTITION_FIELD,
+                PARTITION_ATTRIBUTE,
+                SORT_KEY_STRATEGY,
+                SORT_KEY_FIELD,
+                REGION,

Review comment:
       I would suggest moving this property up, just after Credential Provider 
Service.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key to the inserted Items. Partition key is also known as hash key.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-field")
+            .displayName("Partition Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The name of the Item field will be used as partition 
key. Depending on the \"Partition Strategy\" this might be a field from the 
incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-attribute")
+            .displayName("Partition Attribute")
+            .required(true)
+            .dependsOn(PARTITION_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute will be used as the 
value of the partition key when using \"Partition by attribute\" partition 
strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items. Sort key is also known as range key.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies which field will be used as sort key when 
the sort key strategy is set to \"Sort by field\".")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(

Review comment:
       It is a good practice to define a `private static final 
List<PropertyDescriptor> PROPERTIES` field and to return it from 
`getSupportedPropertyDescriptors()` instead of always creating the list and the 
overridden properties.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("generated", "Generated",
+            "The processor will use the value of \"Partition Field\" property 
for name and a generated UUID as value for the partition key.");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("none", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("by-field", 
"Sort by field",
+            "With this strategy, the processor will use the name and value of 
the field identified by \"Sort Key Field\" as sort key.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("by-sequence", "Generate sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. The field name is 
determined by \"Sort Key Field\" as sort key.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-strategy")
+            .displayName("Partition Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key to the inserted Items. Partition key is also known as hash key.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-field")
+            .displayName("Partition Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The name of the Item field will be used as partition 
key. Depending on the \"Partition Strategy\" this might be a field from the 
incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-attribute")
+            .displayName("Partition Attribute")
+            .required(true)
+            .dependsOn(PARTITION_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute will be used as the 
value of the partition key when using \"Partition by attribute\" partition 
strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)

Review comment:
       Is there a specific reason why FlowFile attribute is not supported for 
the Sort Key (only for Partition Key)?

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",

Review comment:
       Labels of `AllowableValue`-s should follow title case rules, like 
property names.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");

Review comment:
       `... as partition key.` if I'm not wrong.

##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("by-attribute", "Partition by attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key and the value of the \"Partition 
Field\" as field name " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Field\". " +
+            "With this strategy, it is recommended to use sort key as without 
sort key DynamoDB will not allow multiple Items with the same partition 
value.");

Review comment:
       I would omit the last sentence because it is a table design question and 
the user cannot decide on it here.
   Furthermore, the attribute value can be unique and appropriate for primary 
key alone.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to