[
https://issues.apache.org/jira/browse/BEAM-7043?focusedWorklogId=232824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-232824
]
ASF GitHub Bot logged work on BEAM-7043:
----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/19 13:04
Start Date: 25/Apr/19 13:04
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #8390: [BEAM-7043]
Add DynamoDBIO
URL: https://github.com/apache/beam/pull/8390#discussion_r278496151
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamodbIO.java
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a
href="https://aws.amazon.com/dynamodb/">DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<BatchWriteItemRequest> data = ...;
+ *
+ * data.apply(DynamodbIO.write()
+ * .withRetryConfiguration(
+ * DynamodbIO.RetryConfiguration.create(
+ * 4, org.joda.time.Duration.standardSeconds(10)))
+ * .withAWSClientsProvider(new BasisDynamodbProvider(accessKey, secretKey,
region))
+ * .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ * <li>retry configuration
+ * <li>need to specify AwsClientsProvider. You can pass on the default one
BasisDynamodbProvider
+ * <li>an output tag where you can get results. Example in DynamodbIOTest
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamodbIO {
+ public static Read read() {
+ return new AutoValue_DynamodbIO_Read.Builder()
+ .setNumOfItemPerSegment(Integer.MAX_VALUE)
+ .setNumOfSplits(1)
+ .build();
+ }
+
+ public static Write write() {
+ return new AutoValue_DynamodbIO_Write.Builder().build();
+ }
+
+ /** Read data from DynamoDB by BatchGetItemRequest. Todo: doc more */
+ @AutoValue
+ public abstract static class Read
+ extends PTransform<PBegin, PCollection<Map<String, AttributeValue>>> {
+ @Nullable
+ abstract AwsClientsProvider getAWSClientsProvider();
+
+ @Nullable
+ abstract String getTableName();
+
+ @Nullable
+ abstract String getFilterExpression();
+
+ @Nullable
+ abstract Map<String, AttributeValue> getExpressionAttributeValues();
+
+ @Nullable
+ abstract Map<String, String> getExpressionAttributeNames();
+
+ @Nullable
+ abstract String getProjectionExpression();
+
+ abstract int getNumOfItemPerSegment();
+
+ abstract int getNumOfSplits();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setAWSClientsProvider(AwsClientsProvider
clientProvider);
+
+ abstract Builder setTableName(String tableName);
+
+ abstract Builder setFilterExpression(String filterExpression);
+
+ abstract Builder setExpressionAttributeValues(
+ Map<String, AttributeValue> filterExpressionMapValue);
+
+ abstract Builder setExpressionAttributeNames(Map<String, String>
filterExpressionMapName);
+
+ abstract Builder setProjectionExpression(String projectionExpression);
+
+ abstract Builder setNumOfItemPerSegment(int numOfItemPerSegment);
+
+ abstract Builder setNumOfSplits(int numOfSplits);
+
+ abstract Read build();
+ }
+ // TODO: add more AwsClientProviders
+ /**
+ * Allows to specify custom {@link AwsClientsProvider}. {@link
AwsClientsProvider} provides
+ * {@link AmazonDynamoDB} and {@link AmazonCloudWatch} instances which are
later used for
+ * communication with DynamoDB. You should use this method if {@link
+ * Read#withAWSClientsProvider(AwsClientsProvider)} does not suit your
needs.
+ */
+ public Read withAWSClientsProvider(AwsClientsProvider awsClientsProvider) {
+ return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
+ }
+
+ /**
+ * Specify credential details and region to be used to write to dynamo. If
you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * DynamodbIO.Write#withAWSClientsProvider(AwsClientsProvider)}.
+ */
+ public Read withAWSClientsProvider(String awsAccessKey, String
awsSecretKey, Regions region) {
+ return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+ }
+
+ /**
+ * Specify credential details and region to be used to write to DynamoDB.
If you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * DynamodbIO.Write#withAWSClientsProvider(AwsClientsProvider)}.
+ *
+ * <p>The {@code serviceEndpoint} sets an alternative service host. This
is useful to execute
+ * the tests with Kinesis service emulator.
+ */
+ public Read withAWSClientsProvider(
+ String awsAccessKey, String awsSecretKey, Regions region, String
serviceEndpoint) {
+ return withAWSClientsProvider(
+ new BasisDynamodbProvider(awsAccessKey, awsSecretKey, region,
serviceEndpoint));
+ }
+
+ public Read withTableName(String tableName) {
+ return toBuilder().setTableName(tableName).build();
+ }
+
+ public Read withFilterExpression(String filterExpression) {
+ return toBuilder().setFilterExpression(filterExpression).build();
+ }
+
+ public Read withExpressionAttributeNames(Map<String, String>
filterExpressionMapName) {
+ return
toBuilder().setExpressionAttributeNames(filterExpressionMapName).build();
+ }
+
+ public Read withExpressionAttributeValues(
+ Map<String, AttributeValue> filterExpressionMapValue) {
+ return
toBuilder().setExpressionAttributeValues(filterExpressionMapValue).build();
+ }
+
+ public Read withProjectionExpression(String projectionExpression) {
+ return toBuilder().setProjectionExpression(projectionExpression).build();
+ }
+
+ public Read withNumOfItemPerSegment(int numOfItemPerSegment) {
+ return toBuilder().setNumOfItemPerSegment(numOfItemPerSegment).build();
+ }
+
+ public Read withNumOfSplits(int numOfSplits) {
+ return toBuilder().setNumOfSplits(numOfSplits).build();
+ }
+
+ @Override
+ public PCollection<Map<String, AttributeValue>> expand(PBegin input) {
+
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ new DynamodbBoundedSource(
Review comment:
CHange this to pass the `Read` object here and extract the fields from it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 232824)
Time Spent: 20m (was: 10m)
> Add DynamoDBIO
> --------------
>
> Key: BEAM-7043
> URL: https://issues.apache.org/jira/browse/BEAM-7043
> Project: Beam
> Issue Type: New Feature
> Components: io-java-aws
> Reporter: Cam Mach
> Assignee: Cam Mach
> Priority: Minor
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Currently we don't have any feature to write data to AWS DynamoDB. This
> feature will enable us to send data to DynamoDB
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)