simonbence commented on a change in pull request #5761:
URL: https://github.com/apache/nifi/pull/5761#discussion_r819619231



##########
File path: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including 
partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the 
insert in multiple chunks in order to overcome DynamoDB's limitation on batch 
writing. " +
+        "This might result partially processed FlowFiles in which case the 
FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the 
already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = 
PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of 
chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = 
"DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"DynamoDB range key error"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB 
key not found"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"DynamoDB exception message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error 
code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error 
message"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error 
type"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error 
service"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB 
error is retryable"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB 
error request id"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB 
error status code"),
+        @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttribute(attribute = 
PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of 
chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one 
batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = 
"dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new 
AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition 
Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new 
AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition 
Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name 
defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new 
AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The 
incoming Records must not contain field with the same name defined by the 
\"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", 
"Sort By Field",
+            "With this strategy, the processor will use the value of the field 
identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new 
AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the 
original record's position in the incoming FlowFile. This will be used as sort 
key value.");
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, 
PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign 
partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the 
DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field 
value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used 
as the value of the partition key when using \"Partition by attribute\" 
partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign 
sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the 
DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = 
flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? 
Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new 
DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, 
flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), 
alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + 
e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, 
String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, 
attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && 
result.getThrowable().getCause() instanceof 
ProvisionedThroughputExceededException) {

Review comment:
       As throwable is populated only in case of error, that would introduce an 
extra level of if statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to