[ 
https://issues.apache.org/jira/browse/BEAM-7043?focusedWorklogId=248733&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-248733
 ]

ASF GitHub Bot logged work on BEAM-7043:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/May/19 10:04
            Start Date: 27/May/19 10:04
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #8390:  [BEAM-7043] 
Add DynamoDBIO
URL: https://github.com/apache/beam/pull/8390#discussion_r287729587
 
 

 ##########
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
 ##########
 @@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.dynamodb;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.ScanRequest;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a 
href="https://aws.amazon.com/dynamodb/";>DynamoDB</a>.
+ *
+ * <h3>Writing to DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * DynamoDBIO.DynamoDBConfiguration config = 
DynamoDBIO.DynamoDBConfiguration.create(
+ *     "region", "accessKey", "secretKey");
+ * PCollection<BatchWriteItemRequest> data = ...;
+ *
+ * data.apply(DynamoDBIO.write()
+ *     .withRetryConfiguration(
+ *        DynamoDBIO.RetryConfiguration.create(
+ *          4, org.joda.time.Duration.standardSeconds(10)))
+ *     .withDynamoDBConfiguration(config)
+ *     .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Retry configuration
+ *   <li>DynamoDb configuration
+ *   <li>An output tag where you can get results. Example in DynamoDBIOTest
+ * </ul>
+ *
+ * <h3>Reading from DynamoDB</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * DynamoDBIO.DynamoDBConfiguration config = 
DynamoDBIO.DynamoDBConfiguration.create(
+ *     "endpointUrl", "region", "accessKey", "secretKey");
+ * PCollection<Map<String, AttributeValue>> actual =
+ *     pipeline.apply(DynamoDBIO.read()
+ *         .withScanRequestFn((v) -> new 
ScanRequest(tableName).withTotalSegment(10))
+ *         .withDynamoDBConfiguration(config));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>DynamoDb configuration
+ *   <li>ScanRequestFn, which you build a ScanRequest object with at least 
table name and total
+ *       number of segment. Note This number should base on the number of your 
workers
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class DynamoDBIO {
+  public static Read read() {
+    return new AutoValue_DynamoDBIO_Read.Builder().build();
+  }
+
+  public static Write write() {
+    return new AutoValue_DynamoDBIO_Write.Builder().build();
+  }
+
+  /** A config object used to construct AmazonDynamoDB client object. */
+  @AutoValue
+  public abstract static class DynamoDBConfiguration implements Serializable {
+    @Nullable
+    abstract ValueProvider<String> getRegion();
+
+    @Nullable
+    abstract ValueProvider<String> getEndpointUrl();
+
+    @Nullable
+    abstract ValueProvider<String> getAwsAccessKey();
+
+    @Nullable
+    abstract ValueProvider<String> getAwsSecretKey();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setRegion(ValueProvider<String> region);
+
+      abstract Builder setEndpointUrl(ValueProvider<String> endpointUrl);
+
+      abstract Builder setAwsAccessKey(ValueProvider<String> accessKey);
+
+      abstract Builder setAwsSecretKey(ValueProvider<String> secretKey);
+
+      abstract DynamoDBConfiguration build();
+    }
+
+    public static DynamoDBConfiguration create(String region, String 
accessKey, String secretKey) {
+      checkArgument(region != null, "region can not be null");
+      checkArgument(accessKey != null, "accessKey can not be null");
+      checkArgument(secretKey != null, "secretKey can not be null");
+      return create(
+          null,
+          ValueProvider.StaticValueProvider.of(region),
+          ValueProvider.StaticValueProvider.of(accessKey),
+          ValueProvider.StaticValueProvider.of(secretKey));
+    }
+
+    public static DynamoDBConfiguration create(
+        String endpointUrl, String region, String accessKey, String secretKey) 
{
+      checkArgument(region != null, "region can not be null");
+      checkArgument(accessKey != null, "accessKey can not be null");
+      checkArgument(secretKey != null, "secretKey can not be null");
+      return create(
+          ValueProvider.StaticValueProvider.of(endpointUrl),
+          ValueProvider.StaticValueProvider.of(region),
+          ValueProvider.StaticValueProvider.of(accessKey),
+          ValueProvider.StaticValueProvider.of(secretKey));
+    }
+
+    public static DynamoDBConfiguration create(
+        ValueProvider<String> endpointUrl,
+        ValueProvider<String> region,
+        ValueProvider<String> accessKey,
+        ValueProvider<String> secretKey) {
+      checkArgument(region != null, "region can not be null");
+      checkArgument(accessKey != null, "accessKey can not be null");
+      checkArgument(secretKey != null, "secretKey can not be null");
+      return new AutoValue_DynamoDBIO_DynamoDBConfiguration.Builder()
+          .setEndpointUrl(endpointUrl)
+          .setRegion(region)
+          .setAwsAccessKey(accessKey)
+          .setAwsSecretKey(secretKey)
+          .build();
+    }
+
+    public DynamoDBConfiguration withRegion(String region) {
+      return withRegion(ValueProvider.StaticValueProvider.of(region));
+    }
+
+    public DynamoDBConfiguration withRegion(ValueProvider<String> region) {
+      return builder().setRegion(region).build();
+    }
+
+    public DynamoDBConfiguration withEndpointUrl(String endpointUrl) {
+      return 
withEndpointUrl(ValueProvider.StaticValueProvider.of(endpointUrl));
+    }
+
+    public DynamoDBConfiguration withEndpointUrl(ValueProvider<String> 
endpointUrl) {
+      return builder().setRegion(endpointUrl).build();
+    }
+
+    public DynamoDBConfiguration withAwsAccessKey(String accessKey) {
+      return withAwsAccessKey(ValueProvider.StaticValueProvider.of(accessKey));
+    }
+
+    public DynamoDBConfiguration withAwsAccessKey(ValueProvider<String> 
accessKey) {
+      return builder().setAwsAccessKey(accessKey).build();
+    }
+
+    public DynamoDBConfiguration withAwsSecretKey(String secretKey) {
+      return withAwsSecretKey(ValueProvider.StaticValueProvider.of(secretKey));
+    }
+
+    public DynamoDBConfiguration withAwsSecretKey(ValueProvider<String> 
secretKey) {
+      return builder().setAwsSecretKey(secretKey).build();
+    }
+
+    AmazonDynamoDB buildAmazonDynamoDB() {
+      AmazonDynamoDBClientBuilder builder = 
AmazonDynamoDBClientBuilder.standard();
+      if (getEndpointUrl() != null) {
+        builder.setEndpointConfiguration(
+            new AwsClientBuilder.EndpointConfiguration(getEndpointUrl().get(), 
getRegion().get()));
+      }
+      if (getEndpointUrl() == null && getRegion() != null) {
+        builder.setRegion(getRegion().get());
+      }
+      if (getAwsAccessKey() != null && getAwsSecretKey() != null) {
+        builder.setCredentials(
+            new AWSStaticCredentialsProvider(
+                new BasicAWSCredentials(getAwsAccessKey().get(), 
getAwsSecretKey().get())));
+      }
+      return builder.build();
+    }
+  }
+
+  /** Read data from DynamoDB and return PCollection<Map<String, 
AttributeValue>>. */
+  @AutoValue
+  public abstract static class Read
+      extends PTransform<PBegin, PCollection<Map<String, AttributeValue>>> {
+    @Nullable
+    abstract DynamoDBConfiguration getDynamoDBConfiguration();
+
+    @Nullable
+    abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setDynamoDBConfiguration(DynamoDBConfiguration 
dynamoDBConfiguration);
+
+      abstract Builder setScanRequestFn(SerializableFunction<Void, 
ScanRequest> fn);
+
+      abstract Read build();
+    }
+
+    public Read withDynamoDBConfiguration(DynamoDBConfiguration 
dynamoDBConfiguration) {
+      return 
toBuilder().setDynamoDBConfiguration(dynamoDBConfiguration).build();
+    }
+
+    public Read withScanRequestFn(SerializableFunction<Void, ScanRequest> fn) {
 
 Review comment:
   No, that was what I was afraid of, ok so let's let it like it is then. Can 
you add the reason why we had to do this to the Javadoc of that method please 
Thanks.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 248733)
    Time Spent: 10h 40m  (was: 10.5h)

> Add DynamoDBIO
> --------------
>
>                 Key: BEAM-7043
>                 URL: https://issues.apache.org/jira/browse/BEAM-7043
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-aws
>            Reporter: Cam Mach
>            Assignee: Cam Mach
>            Priority: Minor
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> Currently we don't have any feature to write data to AWS DynamoDB. This 
> feature will enable us to send data to DynamoDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to