simonbence commented on a change in pull request #5761:
URL: https://github.com/apache/nifi/pull/5761#discussion_r814839885



##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks 
successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.CHUNKS_PROCESSED_ATTRIBUTE, 
description = "Number of chunks successfully inserted into DynamoDB. If not 
set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("by-field", "Partition by field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition field.");

Review comment:
       As I understand, the terminology of the Dynamo is that, "key" itself is 
the value of a given field. The given field is designated to be used as the 
"source" of the key. Here the description tries to communicate that if the 
strategy is set to Partition by field, than we will simply use the (record's) 
field's name and value for partitioning (In practice, but this is a term in the 
Java API: we will add that key-value pair to the Item as "key component")




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to