[
https://issues.apache.org/jira/browse/BEAM-7043?focusedWorklogId=256959&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-256959
]
ASF GitHub Bot logged work on BEAM-7043:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jun/19 16:41
Start Date: 10/Jun/19 16:41
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #8390: [BEAM-7043]
Add DynamoDBIO
URL: https://github.com/apache/beam/pull/8390#discussion_r292090757
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a
href="https://aws.amazon.com/dynamodb/">DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * DynamoDBIO.DynamoDBConfiguration config =
DynamoDBIO.DynamoDBConfiguration.create(
+ * "region", "accessKey", "secretKey");
+ * PCollection<BatchWriteItemRequest> data = ...;
+ *
+ * data.apply(DynamoDBIO.write()
+ * .withRetryConfiguration(
+ * DynamoDBIO.RetryConfiguration.create(
+ * 4, org.joda.time.Duration.standardSeconds(10)))
+ * .withDynamoDBConfiguration(config)
+ * .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ * <li>Retry configuration
+ * <li>DynamoDb configuration
+ * <li>An output tag where you can get results. Example in DynamoDBIOTest
+ * </ul>
+ *
+ * <h3>Reading from DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * DynamoDBIO.DynamoDBConfiguration config =
DynamoDBIO.DynamoDBConfiguration.create(
+ * "endpointUrl", "region", "accessKey", "secretKey");
+ * PCollection<Map<String, AttributeValue>> actual =
+ * pipeline.apply(DynamoDBIO.read()
+ * .withScanRequestFn((v) -> new
ScanRequest(tableName).withTotalSegment(10))
+ * .withDynamoDBConfiguration(config));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ * <li>DynamoDb configuration
+ * <li>ScanRequestFn, which you build a ScanRequest object with at least
table name and total
+ * number of segment. Note This number should base on the number of your
workers
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamoDBIO {
+ public static Read read() {
+ return new AutoValue_DynamoDBIO_Read.Builder().build();
+ }
+
+ public static Write write() {
+ return new AutoValue_DynamoDBIO_Write.Builder().build();
+ }
+
+ /** A config object used to construct AmazonDynamoDB client object. */
+ @AutoValue
+ public abstract static class DynamoDBConfiguration implements Serializable {
+ @Nullable
+ abstract ValueProvider<String> getRegion();
+
+ @Nullable
+ abstract ValueProvider<String> getEndpointUrl();
+
+ @Nullable
+ abstract ValueProvider<String> getAwsAccessKey();
+
+ @Nullable
+ abstract ValueProvider<String> getAwsSecretKey();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setRegion(ValueProvider<String> region);
+
+ abstract Builder setEndpointUrl(ValueProvider<String> endpointUrl);
+
+ abstract Builder setAwsAccessKey(ValueProvider<String> accessKey);
+
+ abstract Builder setAwsSecretKey(ValueProvider<String> secretKey);
+
+ abstract DynamoDBConfiguration build();
+ }
+
+ public static DynamoDBConfiguration create(String region, String
accessKey, String secretKey) {
+ checkArgument(region != null, "region can not be null");
+ checkArgument(accessKey != null, "accessKey can not be null");
+ checkArgument(secretKey != null, "secretKey can not be null");
+ return create(
+ null,
+ ValueProvider.StaticValueProvider.of(region),
+ ValueProvider.StaticValueProvider.of(accessKey),
+ ValueProvider.StaticValueProvider.of(secretKey));
+ }
+
+ public static DynamoDBConfiguration create(
+ String endpointUrl, String region, String accessKey, String secretKey)
{
+ checkArgument(region != null, "region can not be null");
+ checkArgument(accessKey != null, "accessKey can not be null");
+ checkArgument(secretKey != null, "secretKey can not be null");
+ return create(
+ ValueProvider.StaticValueProvider.of(endpointUrl),
+ ValueProvider.StaticValueProvider.of(region),
+ ValueProvider.StaticValueProvider.of(accessKey),
+ ValueProvider.StaticValueProvider.of(secretKey));
+ }
+
+ public static DynamoDBConfiguration create(
+ ValueProvider<String> endpointUrl,
+ ValueProvider<String> region,
+ ValueProvider<String> accessKey,
+ ValueProvider<String> secretKey) {
+ checkArgument(region != null, "region can not be null");
+ checkArgument(accessKey != null, "accessKey can not be null");
+ checkArgument(secretKey != null, "secretKey can not be null");
+ return new AutoValue_DynamoDBIO_DynamoDBConfiguration.Builder()
+ .setEndpointUrl(endpointUrl)
+ .setRegion(region)
+ .setAwsAccessKey(accessKey)
+ .setAwsSecretKey(secretKey)
+ .build();
+ }
+
+ public DynamoDBConfiguration withRegion(String region) {
+ return withRegion(ValueProvider.StaticValueProvider.of(region));
+ }
+
+ public DynamoDBConfiguration withRegion(ValueProvider<String> region) {
+ return builder().setRegion(region).build();
+ }
+
+ public DynamoDBConfiguration withEndpointUrl(String endpointUrl) {
+ return
withEndpointUrl(ValueProvider.StaticValueProvider.of(endpointUrl));
+ }
+
+ public DynamoDBConfiguration withEndpointUrl(ValueProvider<String>
endpointUrl) {
+ return builder().setRegion(endpointUrl).build();
+ }
+
+ public DynamoDBConfiguration withAwsAccessKey(String accessKey) {
+ return withAwsAccessKey(ValueProvider.StaticValueProvider.of(accessKey));
+ }
+
+ public DynamoDBConfiguration withAwsAccessKey(ValueProvider<String>
accessKey) {
+ return builder().setAwsAccessKey(accessKey).build();
+ }
+
+ public DynamoDBConfiguration withAwsSecretKey(String secretKey) {
+ return withAwsSecretKey(ValueProvider.StaticValueProvider.of(secretKey));
+ }
+
+ public DynamoDBConfiguration withAwsSecretKey(ValueProvider<String>
secretKey) {
+ return builder().setAwsSecretKey(secretKey).build();
+ }
+
+ AmazonDynamoDB buildAmazonDynamoDB() {
+ AmazonDynamoDBClientBuilder builder =
AmazonDynamoDBClientBuilder.standard();
+ if (getEndpointUrl() != null) {
+ builder.setEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(getEndpointUrl().get(),
getRegion().get()));
+ }
+ if (getEndpointUrl() == null && getRegion() != null) {
+ builder.setRegion(getRegion().get());
+ }
+ if (getAwsAccessKey() != null && getAwsSecretKey() != null) {
+ builder.setCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(getAwsAccessKey().get(),
getAwsSecretKey().get())));
+ }
+ return builder.build();
+ }
+ }
+
+ /** Read data from DynamoDB and return PCollection<Map<String,
AttributeValue>>. */
+ @AutoValue
+ public abstract static class Read
+ extends PTransform<PBegin, PCollection<Map<String, AttributeValue>>> {
+ @Nullable
+ abstract DynamoDBConfiguration getDynamoDBConfiguration();
+
+ @Nullable
+ abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setDynamoDBConfiguration(DynamoDBConfiguration
dynamoDBConfiguration);
+
+ abstract Builder setScanRequestFn(SerializableFunction<Void,
ScanRequest> fn);
+
+ abstract Read build();
+ }
+
+ public Read withDynamoDBConfiguration(DynamoDBConfiguration
dynamoDBConfiguration) {
+ return
toBuilder().setDynamoDBConfiguration(dynamoDBConfiguration).build();
+ }
+
+ public Read withScanRequestFn(SerializableFunction<Void, ScanRequest> fn) {
+ return toBuilder().setScanRequestFn(fn).build();
+ }
+
+ @Override
+ public PCollection<Map<String, AttributeValue>> expand(PBegin input) {
+ checkArgument((getScanRequestFn() != null), "withScanRequestFn() is
required");
+ checkArgument(
+ (getDynamoDBConfiguration() != null), "withDynamoDBConfiguration()
is required");
+ checkArgument(
+ (getScanRequestFn().apply(null).getTotalSegments() != null
+ && getScanRequestFn().apply(null).getTotalSegments() > 0),
+ "TotalSegments is required with withScanRequestFn() and greater
zero");
+
+ return input.apply(org.apache.beam.sdk.io.Read.from(new
DynamoDBBoundedSource(this, 0)));
+ }
+ }
+
+ /**
+ * A POJO encapsulating a configuration for retry behavior when issuing
requests to dynamodb. A
+ * retry will be attempted until the maxAttempts or maxDuration is exceeded,
whichever comes
+ * first, for any of the following exceptions:
+ *
+ * <ul>
+ * <li>{@link IOException}
+ * </ul>
+ */
+ @AutoValue
+ public abstract static class RetryConfiguration implements Serializable {
+ @VisibleForTesting
+ static final RetryPredicate DEFAULT_RETRY_PREDICATE = new
DefaultRetryPredicate();
+
+ abstract int getMaxAttempts();
+
+ abstract Duration getMaxDuration();
+
+ abstract DynamoDBIO.RetryConfiguration.RetryPredicate getRetryPredicate();
+
+ abstract DynamoDBIO.RetryConfiguration.Builder builder();
+
+ public static DynamoDBIO.RetryConfiguration create(int maxAttempts,
Duration maxDuration) {
+ checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
+ checkArgument(
+ maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+ "maxDuration should be greater than 0");
+ return new AutoValue_DynamoDBIO_RetryConfiguration.Builder()
+ .setMaxAttempts(maxAttempts)
+ .setMaxDuration(maxDuration)
+ .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+ .build();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract DynamoDBIO.RetryConfiguration.Builder setMaxAttempts(int
maxAttempts);
+
+ abstract DynamoDBIO.RetryConfiguration.Builder setMaxDuration(Duration
maxDuration);
+
+ abstract DynamoDBIO.RetryConfiguration.Builder setRetryPredicate(
+ RetryPredicate retryPredicate);
+
+ abstract DynamoDBIO.RetryConfiguration build();
+ }
+
+ /**
+ * An interface used to control if we retry the BatchWriteItemRequest call
when a {@link
+ * Throwable} occurs. If {@link RetryPredicate#test(Object)} returns true,
{@link Write} tries
+ * to resend the requests to the dynamodb server if the {@link
RetryConfiguration} permits it.
+ */
+ @FunctionalInterface
+ interface RetryPredicate extends Predicate<Throwable>, Serializable {}
+
+ private static class DefaultRetryPredicate implements RetryPredicate {
+ private static final ImmutableSet<Integer> ELIGIBLE_CODES =
+ ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
+
+ @Override
+ public boolean test(Throwable throwable) {
+ return (throwable instanceof IOException
+ || (throwable instanceof AmazonDynamoDBException)
+ || (throwable instanceof AmazonDynamoDBException
+ && ELIGIBLE_CODES.contains(((AmazonDynamoDBException)
throwable).getStatusCode())));
+ }
+ }
+ }
+
+ /** Implementation of {@link #write}. */
+ @AutoValue
+ public abstract static class Write
+ extends PTransform<PCollection<BatchWriteItemRequest>, PCollectionTuple>
{
Review comment:
Since this is a sink you may not really care about multiple outputs in the
return type of the `Write` definition. You can achieve multiple outputs by
creating subgraphs in the pipeline with `PCollectionTuple` and then chaining to
each outputting path the write transform.
We could have decided to make the signature
`PTransform<PCollection<BatchWriteItemRequest>, PDone>` too, but I asked
explictly to use the `PCollection<Void>` as a return type to enable in the
future the possibility to have patterns like waiting for a PCollection write
before another, that's the reason why (sorry if I was not explicit about this).
I still have to check `SnsIO` to see why they took that approach but I
would not be surprised if we can achieve the same without needing to return a
`PCollectionTuple` in the Write.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 256959)
Time Spent: 14h 40m (was: 14.5h)
> Add DynamoDBIO
> --------------
>
> Key: BEAM-7043
> URL: https://issues.apache.org/jira/browse/BEAM-7043
> Project: Beam
> Issue Type: New Feature
> Components: io-java-aws
> Reporter: Cam Mach
> Assignee: Cam Mach
> Priority: Minor
> Fix For: 2.14.0
>
> Time Spent: 14h 40m
> Remaining Estimate: 0h
>
> Currently we don't have any feature to write data to AWS DynamoDB. This
> feature will enable us to send data to DynamoDB
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)