[
https://issues.apache.org/jira/browse/BEAM-7043?focusedWorklogId=232832&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-232832
]
ASF GitHub Bot logged work on BEAM-7043:
----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/19 13:04
Start Date: 25/Apr/19 13:04
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #8390: [BEAM-7043]
Add DynamoDBIO
URL: https://github.com/apache/beam/pull/8390#discussion_r278519939
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamodbIO.java
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a
href="https://aws.amazon.com/dynamodb/">DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<BatchWriteItemRequest> data = ...;
+ *
+ * data.apply(DynamodbIO.write()
+ * .withRetryConfiguration(
+ * DynamodbIO.RetryConfiguration.create(
+ * 4, org.joda.time.Duration.standardSeconds(10)))
+ * .withAWSClientsProvider(new BasisDynamodbProvider(accessKey, secretKey,
region))
+ * .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ * <li>retry configuration
+ * <li>need to specify AwsClientsProvider. You can pass on the default one
BasisDynamodbProvider
+ * <li>an output tag where you can get results. Example in DynamodbIOTest
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamodbIO {
+ public static Read read() {
+ return new AutoValue_DynamodbIO_Read.Builder()
+ .setNumOfItemPerSegment(Integer.MAX_VALUE)
+ .setNumOfSplits(1)
+ .build();
+ }
+
+ public static Write write() {
+ return new AutoValue_DynamodbIO_Write.Builder().build();
+ }
+
+ /** Read data from DynamoDB by BatchGetItemRequest. Todo: doc more */
+ @AutoValue
+ public abstract static class Read
+ extends PTransform<PBegin, PCollection<Map<String, AttributeValue>>> {
+ @Nullable
+ abstract AwsClientsProvider getAWSClientsProvider();
+
+ @Nullable
+ abstract String getTableName();
+
+ @Nullable
+ abstract String getFilterExpression();
+
+ @Nullable
+ abstract Map<String, AttributeValue> getExpressionAttributeValues();
+
+ @Nullable
+ abstract Map<String, String> getExpressionAttributeNames();
+
+ @Nullable
+ abstract String getProjectionExpression();
+
+ abstract int getNumOfItemPerSegment();
+
+ abstract int getNumOfSplits();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setAWSClientsProvider(AwsClientsProvider
clientProvider);
+
+ abstract Builder setTableName(String tableName);
+
+ abstract Builder setFilterExpression(String filterExpression);
+
+ abstract Builder setExpressionAttributeValues(
+ Map<String, AttributeValue> filterExpressionMapValue);
+
+ abstract Builder setExpressionAttributeNames(Map<String, String>
filterExpressionMapName);
+
+ abstract Builder setProjectionExpression(String projectionExpression);
+
+ abstract Builder setNumOfItemPerSegment(int numOfItemPerSegment);
+
+ abstract Builder setNumOfSplits(int numOfSplits);
+
+ abstract Read build();
+ }
+ // TODO: add more AwsClientProviders
+ /**
+ * Allows to specify custom {@link AwsClientsProvider}. {@link
AwsClientsProvider} provides
+ * {@link AmazonDynamoDB} and {@link AmazonCloudWatch} instances which are
later used for
+ * communication with DynamoDB. You should use this method if {@link
+ * Read#withAWSClientsProvider(AwsClientsProvider)} does not suit your
needs.
+ */
+ public Read withAWSClientsProvider(AwsClientsProvider awsClientsProvider) {
+ return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
+ }
+
+ /**
+ * Specify credential details and region to be used to write to dynamo. If
you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * DynamodbIO.Write#withAWSClientsProvider(AwsClientsProvider)}.
+ */
+ public Read withAWSClientsProvider(String awsAccessKey, String
awsSecretKey, Regions region) {
+ return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+ }
+
+ /**
+ * Specify credential details and region to be used to write to DynamoDB.
If you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * DynamodbIO.Write#withAWSClientsProvider(AwsClientsProvider)}.
+ *
+ * <p>The {@code serviceEndpoint} sets an alternative service host. This
is useful to execute
+ * the tests with Kinesis service emulator.
+ */
+ public Read withAWSClientsProvider(
+ String awsAccessKey, String awsSecretKey, Regions region, String
serviceEndpoint) {
+ return withAWSClientsProvider(
+ new BasisDynamodbProvider(awsAccessKey, awsSecretKey, region,
serviceEndpoint));
+ }
+
+ public Read withTableName(String tableName) {
+ return toBuilder().setTableName(tableName).build();
+ }
+
+ public Read withFilterExpression(String filterExpression) {
+ return toBuilder().setFilterExpression(filterExpression).build();
+ }
+
+ public Read withExpressionAttributeNames(Map<String, String>
filterExpressionMapName) {
+ return
toBuilder().setExpressionAttributeNames(filterExpressionMapName).build();
+ }
+
+ public Read withExpressionAttributeValues(
+ Map<String, AttributeValue> filterExpressionMapValue) {
+ return
toBuilder().setExpressionAttributeValues(filterExpressionMapValue).build();
+ }
+
+ public Read withProjectionExpression(String projectionExpression) {
+ return toBuilder().setProjectionExpression(projectionExpression).build();
+ }
+
+ public Read withNumOfItemPerSegment(int numOfItemPerSegment) {
+ return toBuilder().setNumOfItemPerSegment(numOfItemPerSegment).build();
+ }
+
+ public Read withNumOfSplits(int numOfSplits) {
+ return toBuilder().setNumOfSplits(numOfSplits).build();
+ }
+
+ @Override
+ public PCollection<Map<String, AttributeValue>> expand(PBegin input) {
+
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ new DynamodbBoundedSource(
+ getAWSClientsProvider(),
+ getTableName(),
+ getFilterExpression(),
+ getExpressionAttributeNames(),
+ getExpressionAttributeValues(),
+ getProjectionExpression(),
+ getNumOfItemPerSegment(),
+ getNumOfSplits(),
+ 0)));
+ }
+ }
+
+ /**
+ * A POJO encapsulating a configuration for retry behavior when issuing
requests to dynamodb. A
+ * retry will be attempted until the maxAttempts or maxDuration is exceeded,
whichever comes
+ * first, for any of the following exceptions:
+ *
+ * <ul>
+ * <li>{@link IOException}
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class RetryConfiguration implements Serializable {
+ @VisibleForTesting
+ static final RetryPredicate DEFAULT_RETRY_PREDICATE = new
DefaultRetryPredicate();
+
+ abstract int getMaxAttempts();
+
+ abstract Duration getMaxDuration();
+
+ abstract DynamodbIO.RetryConfiguration.RetryPredicate getRetryPredicate();
+
+ abstract DynamodbIO.RetryConfiguration.Builder builder();
+
+ public static DynamodbIO.RetryConfiguration create(int maxAttempts,
Duration maxDuration) {
+ checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
+ checkArgument(
+ maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+ "maxDuration should be greater than 0");
+ return new AutoValue_DynamodbIO_RetryConfiguration.Builder()
+ .setMaxAttempts(maxAttempts)
+ .setMaxDuration(maxDuration)
+ .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+ .build();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract DynamodbIO.RetryConfiguration.Builder setMaxAttempts(int
maxAttempts);
+
+ abstract DynamodbIO.RetryConfiguration.Builder setMaxDuration(Duration
maxDuration);
+
+ abstract DynamodbIO.RetryConfiguration.Builder setRetryPredicate(
+ RetryPredicate retryPredicate);
+
+ abstract DynamodbIO.RetryConfiguration build();
+ }
+
+ /**
+ * An interface used to control if we retry the BatchWriteItemRequest call
when a {@link
+ * Throwable} occurs. If {@link RetryPredicate#test(Object)} returns true,
{@link Write} tries
+ * to resend the requests to the Solr server if the {@link
RetryConfiguration} permits it.
+ */
+ @FunctionalInterface
+ interface RetryPredicate extends Predicate<Throwable>, Serializable {}
+
+ private static class DefaultRetryPredicate implements RetryPredicate {
+ private static final ImmutableSet<Integer> ELIGIBLE_CODES =
+ ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
+
+ @Override
+ public boolean test(Throwable throwable) {
+ return (throwable instanceof IOException
+ || (throwable instanceof AmazonDynamoDBException)
+ || (throwable instanceof AmazonDynamoDBException
Review comment:
remove first condition of the pare it will be already be true because of the
previous line.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 232832)
> Add DynamoDBIO
> --------------
>
> Key: BEAM-7043
> URL: https://issues.apache.org/jira/browse/BEAM-7043
> Project: Beam
> Issue Type: New Feature
> Components: io-java-aws
> Reporter: Cam Mach
> Assignee: Cam Mach
> Priority: Minor
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Currently we don't have any feature to write data to AWS DynamoDB. This
> feature will enable us to send data to DynamoDB
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)