This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3fdf691 [BEAM-7043] Add DynamoDBIO new 33d7cd0 Merge pull request #8390: [BEAM-7043] Add DynamoDBIO 3fdf691 is described below commit 3fdf691763201a48deab21a9279c27375b63559e Author: Cam Mach <cm...@godaddy.com> AuthorDate: Mon Apr 15 16:19:18 2019 -0700 [BEAM-7043] Add DynamoDBIO --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- sdks/java/io/amazon-web-services/build.gradle | 2 + .../sdk/io/aws/dynamodb/AttributeValueCoder.java | 166 +++++++ .../AttributeValueCoderProviderRegistrar.java | 37 ++ .../sdk/io/aws/dynamodb/AwsClientsProvider.java | 34 ++ .../sdk/io/aws/dynamodb/BasicDynamoDBProvider.java | 75 +++ .../beam/sdk/io/aws/dynamodb/DynamoDBIO.java | 536 +++++++++++++++++++++ .../beam/sdk/io/aws/dynamodb/package-info.java | 19 + .../io/aws/dynamodb/AttributeValueCoderTest.java | 211 ++++++++ .../io/aws/dynamodb/AwsClientsProviderMock.java | 46 ++ .../beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java | 213 ++++++++ .../sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java | 168 +++++++ 12 files changed, 1508 insertions(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 57c1ccf..6bc456f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -424,6 +424,7 @@ class BeamModulePlugin implements Plugin<Project> { avro_tests : "org.apache.avro:avro:1.8.2:tests", aws_java_sdk_cloudwatch : "com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version", aws_java_sdk_core : "com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version", + aws_java_sdk_dynamodb : "com.amazonaws:aws-java-sdk-dynamodb:$aws_java_sdk_version", aws_java_sdk_kinesis : "com.amazonaws:aws-java-sdk-kinesis:$aws_java_sdk_version", aws_java_sdk_s3 : "com.amazonaws:aws-java-sdk-s3:$aws_java_sdk_version", aws_java_sdk_sns : "com.amazonaws:aws-java-sdk-sns:$aws_java_sdk_version", @@ -565,7 +566,6 @@ class BeamModulePlugin implements Plugin<Project> { url(project.properties['distMgmtSnapshotsUrl'] ?: isRelease(project) ? 'https://repository.apache.org/service/local/staging/deploy/maven2' : 'https://repository.apache.org/content/repositories/snapshots') - // We attempt to find and load credentials from ~/.m2/settings.xml file that a user // has configured with the Apache release and snapshot staging credentials. // <settings> diff --git a/sdks/java/io/amazon-web-services/build.gradle b/sdks/java/io/amazon-web-services/build.gradle index 560e4c5..0bf33dc 100644 --- a/sdks/java/io/amazon-web-services/build.gradle +++ b/sdks/java/io/amazon-web-services/build.gradle @@ -29,6 +29,7 @@ dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") compile library.java.aws_java_sdk_cloudwatch compile library.java.aws_java_sdk_core + compile library.java.aws_java_sdk_dynamodb compile library.java.aws_java_sdk_s3 compile library.java.aws_java_sdk_sns compile library.java.aws_java_sdk_sqs @@ -47,6 +48,7 @@ dependencies { testCompile library.java.mockito_core testCompile library.java.junit testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1' + testCompile 'org.testcontainers:localstack:1.11.2' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(":runners:direct-java") } diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java new file mode 100644 index 0000000..4bdf8b5 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoder.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** A {@link Coder} that serializes and deserializes the {@link AttributeValue} objects. */ +public class AttributeValueCoder extends AtomicCoder<AttributeValue> { + + /** Data type of each value type in AttributeValue object. */ + private enum AttributeValueType { + s, // for String + n, // for Number + b, // for Byte + sS, // for List of String + nS, // for List of Number + bS, // for List of Byte + m, // for Map of String and AttributeValue + l, // for list of AttributeValue + bOOL, // for Boolean + nULLValue, // for null + } + + private static final AttributeValueCoder INSTANCE = new AttributeValueCoder(); + + private static final ListCoder<String> LIST_STRING_CODER = ListCoder.of(StringUtf8Coder.of()); + private static final ListCoder<byte[]> LIST_BYTE_CODER = ListCoder.of(ByteArrayCoder.of()); + + private static final ListCoder<AttributeValue> LIST_ATTRIBUTE_CODER = + ListCoder.of(AttributeValueCoder.of()); + private static final MapCoder<String, AttributeValue> MAP_ATTRIBUTE_CODER = + MapCoder.of(StringUtf8Coder.of(), AttributeValueCoder.of()); + + private AttributeValueCoder() {} + + public static AttributeValueCoder of() { + return INSTANCE; + } + + @Override + public void encode(AttributeValue value, OutputStream outStream) throws IOException { + + if (value.getS() != null) { + StringUtf8Coder.of().encode(AttributeValueType.s.toString(), outStream); + StringUtf8Coder.of().encode(value.getS(), outStream); + } else if (value.getN() != null) { + StringUtf8Coder.of().encode(AttributeValueType.n.toString(), outStream); + StringUtf8Coder.of().encode(value.getN(), outStream); + } else if (value.getBOOL() != null) { + StringUtf8Coder.of().encode(AttributeValueType.bOOL.toString(), outStream); + BooleanCoder.of().encode(value.getBOOL(), outStream); + } else if (value.getB() != null) { + StringUtf8Coder.of().encode(AttributeValueType.b.toString(), outStream); + ByteArrayCoder.of().encode(convertToByteArray(value.getB()), outStream); + } else if (value.getSS() != null) { + StringUtf8Coder.of().encode(AttributeValueType.sS.toString(), outStream); + LIST_STRING_CODER.encode(value.getSS(), outStream); + } else if (value.getNS() != null) { + StringUtf8Coder.of().encode(AttributeValueType.nS.toString(), outStream); + LIST_STRING_CODER.encode(value.getNS(), outStream); + } else if (value.getBS() != null) { + StringUtf8Coder.of().encode(AttributeValueType.bS.toString(), outStream); + LIST_BYTE_CODER.encode(convertToListByteArray(value.getBS()), outStream); + } else if (value.getL() != null) { + StringUtf8Coder.of().encode(AttributeValueType.l.toString(), outStream); + LIST_ATTRIBUTE_CODER.encode(value.getL(), outStream); + } else if (value.getM() != null) { + StringUtf8Coder.of().encode(AttributeValueType.m.toString(), outStream); + MAP_ATTRIBUTE_CODER.encode(value.getM(), outStream); + } else if (value.getNULL() != null) { + StringUtf8Coder.of().encode(AttributeValueType.nULLValue.toString(), outStream); + BooleanCoder.of().encode(value.getNULL(), outStream); + } else { + throw new CoderException("Unknown Type"); + } + } + + @Override + public AttributeValue decode(InputStream inStream) throws IOException { + AttributeValue attrValue = new AttributeValue(); + + String type = StringUtf8Coder.of().decode(inStream); + AttributeValueType attrType = AttributeValueType.valueOf(type); + + switch (attrType) { + case s: + attrValue.setS(StringUtf8Coder.of().decode(inStream)); + break; + case n: + attrValue.setN(StringUtf8Coder.of().decode(inStream)); + break; + case bOOL: + attrValue.setBOOL(BooleanCoder.of().decode(inStream)); + break; + case b: + attrValue.setB(ByteBuffer.wrap(ByteArrayCoder.of().decode(inStream))); + break; + case sS: + attrValue.setSS(LIST_STRING_CODER.decode(inStream)); + break; + case nS: + attrValue.setNS(LIST_STRING_CODER.decode(inStream)); + break; + case bS: + attrValue.setBS(convertToListByteBuffer(LIST_BYTE_CODER.decode(inStream))); + break; + case l: + attrValue.setL(LIST_ATTRIBUTE_CODER.decode(inStream)); + break; + case m: + attrValue.setM(MAP_ATTRIBUTE_CODER.decode(inStream)); + break; + case nULLValue: + attrValue.setNULL(BooleanCoder.of().decode(inStream)); + break; + default: + throw new CoderException("Unknown Type"); + } + + return attrValue; + } + + private List<byte[]> convertToListByteArray(List<ByteBuffer> listByteBuffer) { + return listByteBuffer.stream().map(this::convertToByteArray).collect(Collectors.toList()); + } + + private byte[] convertToByteArray(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + buffer.position(buffer.position() - bytes.length); + return bytes; + } + + private List<ByteBuffer> convertToListByteBuffer(List<byte[]> listByteArr) { + return listByteArr.stream().map(ByteBuffer::wrap).collect(Collectors.toList()); + } +} diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java new file mode 100644 index 0000000..fc4c909 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderProviderRegistrar.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.google.auto.service.AutoService; +import java.util.List; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviderRegistrar; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; + +/** A {@link CoderProviderRegistrar} for standard types used with {@link DynamoDBIO}. */ +@AutoService(CoderProviderRegistrar.class) +public class AttributeValueCoderProviderRegistrar implements CoderProviderRegistrar { + @Override + public List<CoderProvider> getCoderProviders() { + return ImmutableList.of( + CoderProviders.forCoder(TypeDescriptor.of(AttributeValue.class), AttributeValueCoder.of())); + } +} diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java new file mode 100644 index 0000000..8d1c267 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import java.io.Serializable; + +/** + * Provides instances of AWS clients. + * + * <p>Please note, that any instance of {@link AwsClientsProvider} must be {@link Serializable} to + * ensure it can be sent to worker machines. + */ +public interface AwsClientsProvider extends Serializable { + AmazonCloudWatch getCloudWatchClient(); + + AmazonDynamoDB createDynamoDB(); +} diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java new file mode 100644 index 0000000..bcb2508 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/BasicDynamoDBProvider.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import javax.annotation.Nullable; + +/** Basic implementation of {@link AwsClientsProvider} used by default in {@link DynamoDBIO}. */ +public class BasicDynamoDBProvider implements AwsClientsProvider { + private final String accessKey; + private final String secretKey; + private final Regions region; + @Nullable private final String serviceEndpoint; + + BasicDynamoDBProvider( + String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) { + checkArgument(accessKey != null, "accessKey can not be null"); + checkArgument(secretKey != null, "secretKey can not be null"); + checkArgument(region != null, "region can not be null"); + this.accessKey = accessKey; + this.secretKey = secretKey; + this.region = region; + this.serviceEndpoint = serviceEndpoint; + } + + private AWSCredentialsProvider getCredentialsProvider() { + return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); + } + + @Override + public AmazonCloudWatch getCloudWatchClient() { + AmazonCloudWatchClientBuilder clientBuilder = + AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider()); + if (serviceEndpoint == null) { + clientBuilder.withRegion(region); + } else { + clientBuilder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName())); + } + return clientBuilder.build(); + } + + @Override + public AmazonDynamoDB createDynamoDB() { + return AmazonDynamoDBClientBuilder.standard() + .withCredentials(getCredentialsProvider()) + .withRegion(region) + .build(); + } +} diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java new file mode 100644 index 0000000..cc28137 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; +import org.apache.http.HttpStatus; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PTransform}s to read/write from/to <a + * href="https://aws.amazon.com/dynamodb/">DynamoDB</a>. + * + * <h3>Writing to DynamoDB</h3> + * + * <p>Example usage: + * + * <pre>{@code + * PCollection<T> data = ...; + * data.apply( + * DynamoDBIO.<WriteRequest>write() + * .withWriteRequestMapperFn( + * (SerializableFunction<T, KV<String, WriteRequest>>) + * //Transforming your T data into KV<String, WriteRequest> + * t -> KV.of(tableName, writeRequest)) + * .withRetryConfiguration( + * DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1))) + * .withAwsClientsProvider(new BasicSnsProvider(accessKey, secretKey, region)); + * }</pre> + * + * <p>As a client, you need to provide at least the following things: + * + * <ul> + * <li>Retry configuration + * <li>Specify AwsClientsProvider. You can pass on the default one BasicSnsProvider + * <li>Mapper function with a table name to map or transform your object into KV<tableName, + * writeRequest> + * </ul> + * + * <h3>Reading from DynamoDB</h3> + * + * <p>Example usage: + * + * <pre>{@code + * PCollection<List<Map<String, AttributeValue>>> output = + * pipeline.apply( + * DynamoDBIO.<List<Map<String, AttributeValue>>>read() + * .withAwsClientsProvider(new BasicSnsProvider(accessKey, secretKey, region)) + * .withScanRequestFn( + * (SerializableFunction<Void, ScanRequest>) + * input -> new ScanRequest(tableName).withTotalSegments(1)) + * .items()); + * }</pre> + * + * <p>As a client, you need to provide at least the following things: + * + * <ul> + * <li>Specify AwsClientsProvider. You can pass on the default one BasicSnsProvider + * <li>ScanRequestFn, which you build a ScanRequest object with at least table name and total + * number of segment. Note This number should base on the number of your workers + * </ul> + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public final class DynamoDBIO { + public static <T> Read<T> read() { + return new AutoValue_DynamoDBIO_Read.Builder().build(); + } + + public static <T> Write<T> write() { + return new AutoValue_DynamoDBIO_Write.Builder().build(); + } + + /** Read data from DynamoDB and return ScanResult. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + @Nullable + abstract AwsClientsProvider getAwsClientsProvider(); + + @Nullable + abstract SerializableFunction<Void, ScanRequest> getScanRequestFn(); + + @Nullable + abstract Integer getSegmentId(); + + @Nullable + abstract SerializableFunction<ScanResult, T> getScanResultMapperFn(); + + @Nullable + abstract Coder<T> getCoder(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setAwsClientsProvider(AwsClientsProvider awsClientsProvider); + + abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> fn); + + abstract Builder<T> setSegmentId(Integer segmentId); + + abstract Builder<T> setScanResultMapperFn( + SerializableFunction<ScanResult, T> scanResultMapperFn); + + abstract Builder<T> setCoder(Coder<T> coder); + + abstract Read<T> build(); + } + + public Read<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) { + return toBuilder().setAwsClientsProvider(awsClientsProvider).build(); + } + + public Read<T> withAwsClientsProvider( + String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) { + return withAwsClientsProvider( + new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); + } + + public Read<T> withAwsClientsProvider( + String awsAccessKey, String awsSecretKey, Regions region) { + return withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null); + } + + /** + * Can't pass ScanRequest object directly from client since this object is not full + * serializable. + */ + public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> fn) { + return toBuilder().setScanRequestFn(fn).build(); + } + + private Read<T> withSegmentId(Integer segmentId) { + checkArgument(segmentId != null, "segmentId can not be null"); + return toBuilder().setSegmentId(segmentId).build(); + } + + public Read<T> withScanResultMapperFn(SerializableFunction<ScanResult, T> scanResultMapperFn) { + checkArgument(scanResultMapperFn != null, "scanResultMapper can not be null"); + return toBuilder().setScanResultMapperFn(scanResultMapperFn).build(); + } + + public Read<List<Map<String, AttributeValue>>> items() { + return withScanResultMapperFn(new DynamoDBIO.Read.ItemsMapper()) + .withCoder(ListCoder.of(MapCoder.of(StringUtf8Coder.of(), AttributeValueCoder.of()))); + } + + public Read<T> withCoder(Coder<T> coder) { + checkArgument(coder != null, "coder can not be null"); + return toBuilder().setCoder(coder).build(); + } + + @Override + public PCollection<T> expand(PBegin input) { + checkArgument((getScanRequestFn() != null), "withScanRequestFn() is required"); + checkArgument((getAwsClientsProvider() != null), "withAwsClientsProvider() is required"); + ScanRequest scanRequest = getScanRequestFn().apply(null); + checkArgument( + (scanRequest.getTotalSegments() != null && scanRequest.getTotalSegments() > 0), + "TotalSegments is required with withScanRequestFn() and greater zero"); + + PCollection<Read<T>> splits = + (PCollection<Read<T>>) + input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn())); + splits.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {})); + + PCollection<T> output = + (PCollection<T>) + splits + .apply("Reshuffle", Reshuffle.viaRandomKey()) + .apply("Read", ParDo.of(new ReadFn())); + output.setCoder(getCoder()); + return output; + } + + /** A {@link DoFn} to split {@link Read} elements by segment id. */ + private static class SplitFn<T> extends DoFn<Read<T>, Read<T>> { + @ProcessElement + public void processElement(@Element Read<T> spec, OutputReceiver<Read<T>> out) { + ScanRequest scanRequest = spec.getScanRequestFn().apply(null); + for (int i = 0; i < scanRequest.getTotalSegments(); i++) { + out.output(spec.withSegmentId(i)); + } + } + } + + /** A {@link DoFn} executing the ScanRequest to read from DynamoDB. */ + private static class ReadFn<T> extends DoFn<Read<T>, T> { + @ProcessElement + public void processElement(@Element Read<T> spec, OutputReceiver<T> out) { + AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB(); + ScanRequest scanRequest = spec.getScanRequestFn().apply(null); + scanRequest.setSegment(spec.getSegmentId()); + ScanResult scanResult = client.scan(scanRequest); + out.output(spec.getScanResultMapperFn().apply(scanResult)); + } + } + + static final class ItemsMapper<T> + implements SerializableFunction<ScanResult, List<Map<String, AttributeValue>>> { + @Override + public List<Map<String, AttributeValue>> apply(@Nullable ScanResult scanResult) { + if (scanResult == null) { + return Collections.emptyList(); + } + return scanResult.getItems(); + } + } + } + + /** + * A POJO encapsulating a configuration for retry behavior when issuing requests to dynamodb. A + * retry will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes + * first, for any of the following exceptions: + * + * <ul> + * <li>{@link IOException} + * </ul> + */ + @AutoValue + public abstract static class RetryConfiguration implements Serializable { + @VisibleForTesting + static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate(); + + abstract int getMaxAttempts(); + + abstract Duration getMaxDuration(); + + abstract DynamoDBIO.RetryConfiguration.RetryPredicate getRetryPredicate(); + + abstract DynamoDBIO.RetryConfiguration.Builder builder(); + + public static DynamoDBIO.RetryConfiguration create(int maxAttempts, Duration maxDuration) { + checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0"); + checkArgument( + maxDuration != null && maxDuration.isLongerThan(Duration.ZERO), + "maxDuration should be greater than 0"); + return new AutoValue_DynamoDBIO_RetryConfiguration.Builder() + .setMaxAttempts(maxAttempts) + .setMaxDuration(maxDuration) + .setRetryPredicate(DEFAULT_RETRY_PREDICATE) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract DynamoDBIO.RetryConfiguration.Builder setMaxAttempts(int maxAttempts); + + abstract DynamoDBIO.RetryConfiguration.Builder setMaxDuration(Duration maxDuration); + + abstract DynamoDBIO.RetryConfiguration.Builder setRetryPredicate( + RetryPredicate retryPredicate); + + abstract DynamoDBIO.RetryConfiguration build(); + } + + /** + * An interface used to control if we retry the BatchWriteItemRequest call when a {@link + * Throwable} occurs. If {@link RetryPredicate#test(Object)} returns true, {@link Write} tries + * to resend the requests to the dynamodb server if the {@link RetryConfiguration} permits it. + */ + @FunctionalInterface + interface RetryPredicate extends Predicate<Throwable>, Serializable {} + + private static class DefaultRetryPredicate implements RetryPredicate { + private static final ImmutableSet<Integer> ELIGIBLE_CODES = + ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE); + + @Override + public boolean test(Throwable throwable) { + return (throwable instanceof IOException + || (throwable instanceof AmazonDynamoDBException) + || (throwable instanceof AmazonDynamoDBException + && ELIGIBLE_CODES.contains(((AmazonDynamoDBException) throwable).getStatusCode()))); + } + } + } + + /** Write a PCollection<T> data into Dynamodb. */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> { + + @Nullable + abstract AwsClientsProvider getAwsClientsProvider(); + + @Nullable + abstract RetryConfiguration getRetryConfiguration(); + + @Nullable + abstract SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn(); + + abstract Builder<T> builder(); + + @AutoValue.Builder + abstract static class Builder<T> { + + abstract Builder<T> setAwsClientsProvider(AwsClientsProvider awsClientsProvider); + + abstract Builder<T> setRetryConfiguration(RetryConfiguration retryConfiguration); + + abstract Builder<T> setWriteItemMapperFn( + SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn); + + abstract Write<T> build(); + } + + public Write<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) { + return builder().setAwsClientsProvider(awsClientsProvider).build(); + } + + public Write<T> withAwsClientsProvider( + String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) { + return withAwsClientsProvider( + new BasicDynamoDBProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); + } + + public Write<T> withAwsClientsProvider( + String awsAccessKey, String awsSecretKey, Regions region) { + return withAwsClientsProvider(awsAccessKey, awsSecretKey, region, null); + } + + /** + * Provides configuration to retry a failed request to publish a set of records to DynamoDB. + * Users should consider that retrying might compound the underlying problem which caused the + * initial failure. Users should also be aware that once retrying is exhausted the error is + * surfaced to the runner which <em>may</em> then opt to retry the current partition in entirety + * or abort if the max number of retries of the runner is completed. Retrying uses an + * exponential backoff algorithm, with minimum backoff of 5 seconds and then surfacing the error + * once the maximum number of retries or maximum configuration duration is exceeded. + * + * <p>Example use: + * + * <pre>{@code + * DynamoDBIO.write() + * .withRetryConfiguration(DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1)) + * ... + * }</pre> + * + * @param retryConfiguration the rules which govern the retry behavior + * @return the {@link DynamoDBIO.Write} with retrying configured + */ + public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) { + checkArgument(retryConfiguration != null, "retryConfiguration is required"); + return builder().setRetryConfiguration(retryConfiguration).build(); + } + + public Write<T> withWriteRequestMapperFn( + SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn) { + return builder().setWriteItemMapperFn(writeItemMapperFn).build(); + } + + @Override + public PCollection<Void> expand(PCollection<T> input) { + return input.apply(ParDo.of(new WriteFn<>(this))); + } + + static class WriteFn<T> extends DoFn<T, Void> { + @VisibleForTesting + static final String RETRY_ATTEMPT_LOG = "Error writing to DynamoDB. Retry attempt[%d]"; + + private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5); + private transient FluentBackoff retryBackoff; // defaults to no retries + private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class); + private static final Counter DYNAMO_DB_WRITE_FAILURES = + Metrics.counter(WriteFn.class, "DynamoDB_Write_Failures"); + + private static final int BATCH_SIZE = 25; + private transient AmazonDynamoDB client; + private final DynamoDBIO.Write spec; + private List<KV<String, WriteRequest>> batch; + + WriteFn(DynamoDBIO.Write spec) { + this.spec = spec; + } + + @Setup + public void setup() { + client = spec.getAwsClientsProvider().createDynamoDB(); + retryBackoff = + FluentBackoff.DEFAULT + .withMaxRetries(0) // default to no retrying + .withInitialBackoff(RETRY_INITIAL_BACKOFF); + if (spec.getRetryConfiguration() != null) { + retryBackoff = + retryBackoff + .withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1) + .withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration()); + } + } + + @StartBundle + public void startBundle(StartBundleContext context) { + batch = new ArrayList<>(); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + final KV<String, WriteRequest> writeRequest = + (KV<String, WriteRequest>) spec.getWriteItemMapperFn().apply(context.element()); + batch.add(writeRequest); + if (batch.size() >= BATCH_SIZE) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + flushBatch(); + } + + private void flushBatch() throws IOException, InterruptedException { + if (batch.isEmpty()) { + return; + } + + try { + // Since each element is a KV<tableName, writeRequest> in the batch, we need to group them + // by tableName + Map<String, List<WriteRequest>> mapTableRequest = + batch.stream() + .collect( + Collectors.groupingBy( + KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList()))); + + BatchWriteItemRequest batchRequest = new BatchWriteItemRequest(); + mapTableRequest + .entrySet() + .forEach( + entry -> batchRequest.addRequestItemsEntry(entry.getKey(), entry.getValue())); + + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = retryBackoff.backoff(); + int attempt = 0; + while (true) { + attempt++; + try { + client.batchWriteItem(batchRequest); + break; + } catch (Exception ex) { + // Fail right away if there is no retry configuration + if (spec.getRetryConfiguration() == null + || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) { + DYNAMO_DB_WRITE_FAILURES.inc(); + LOG.info( + "Unable to write batch items {} due to {} ", + batchRequest.getRequestItems().entrySet(), + ex); + throw new IOException("Error writing to DyanmoDB (no attempt made to retry)", ex); + } + + if (!BackOffUtils.next(sleeper, backoff)) { + throw new IOException( + String.format( + "Error writing to DyanmoDB after %d attempt(s). No more attempts allowed", + attempt), + ex); + } else { + // Note: this used in test cases to verify behavior + LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex); + } + } + } + } finally { + batch.clear(); + } + } + + @Teardown + public void tearDown() { + if (client != null) { + client.shutdown(); + client = null; + } + } + } + } +} diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java new file mode 100644 index 0000000..0a7ea55 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Defines IO connectors for Amazon Web Services DynamoDB. */ +package org.apache.beam.sdk.io.aws.dynamodb; diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java new file mode 100644 index 0000000..86fbbe3 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AttributeValueCoderTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +/** Unit test cases for each type of AttributeValue to test encoding and decoding. */ +public class AttributeValueCoderTest { + + @Test + public void shouldPassForStringType() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setS("testing"); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForNumberType() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setN("123"); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForBooleanType() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setBOOL(false); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForByteArray() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setB(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForListOfString() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setSS(ImmutableList.of("foo", "bar")); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForOneListOfNumber() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setNS(ImmutableList.of("123", "456")); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForOneListOfByteArray() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setBS( + ImmutableList.of( + ByteBuffer.wrap("mylistbyte1".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("mylistbyte2".getBytes(StandardCharsets.UTF_8)))); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForListType() throws IOException { + AttributeValue expected = new AttributeValue(); + + List<AttributeValue> listAttr = new ArrayList<>(); + listAttr.add(new AttributeValue("innerMapValue1")); + listAttr.add(new AttributeValue().withN("8976234")); + + expected.setL(listAttr); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForMapType() throws IOException { + AttributeValue expected = new AttributeValue(); + + Map<String, AttributeValue> attrMap = new HashMap<>(); + attrMap.put("innerMapAttr1", new AttributeValue("innerMapValue1")); + attrMap.put( + "innerMapAttr2", + new AttributeValue().withB(ByteBuffer.wrap("8976234".getBytes(StandardCharsets.UTF_8)))); + + expected.setM(attrMap); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } + + @Test + public void shouldPassForNullType() throws IOException { + AttributeValue expected = new AttributeValue(); + expected.setNULL(true); + + AttributeValueCoder coder = AttributeValueCoder.of(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + coder.encode(expected, output); + + ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray()); + + AttributeValue actual = coder.decode(in); + + Assert.assertEquals(expected, actual); + } +} diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java new file mode 100644 index 0000000..dfcf302 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/AwsClientsProviderMock.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import org.mockito.Mockito; + +/** Mocking AwsClientProvider. */ +public class AwsClientsProviderMock implements AwsClientsProvider { + + private static AwsClientsProviderMock instance = new AwsClientsProviderMock(); + private static AmazonDynamoDB db; + + private AwsClientsProviderMock() {} + + public static AwsClientsProviderMock of(AmazonDynamoDB dynamoDB) { + db = dynamoDB; + return instance; + } + + @Override + public AmazonCloudWatch getCloudWatchClient() { + return Mockito.mock(AmazonCloudWatch.class); + } + + @Override + public AmazonDynamoDB createDynamoDB() { + return db; + } +} diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java new file mode 100644 index 0000000..3bd27ed --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +/** Test Coverage for the IO. */ +public class DynamoDBIOTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class); + + private static final String tableName = "TaskA"; + private static final int numOfItems = 10; + + private static List<Map<String, AttributeValue>> expected; + + @BeforeClass + public static void setup() { + DynamoDBIOTestHelper.startServerClient(); + DynamoDBIOTestHelper.createTestTable(tableName); + expected = DynamoDBIOTestHelper.generateTestData(tableName, numOfItems); + } + + @AfterClass + public static void destroy() { + DynamoDBIOTestHelper.stopServerClient(tableName); + } + + // Test cases for Reader. + @Test + public void testReadScanResult() { + PCollection<List<Map<String, AttributeValue>>> actual = + pipeline.apply( + DynamoDBIO.<List<Map<String, AttributeValue>>>read() + .withAwsClientsProvider( + AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())) + .withScanRequestFn( + (SerializableFunction<Void, ScanRequest>) + input -> new ScanRequest(tableName).withTotalSegments(1)) + .items()); + PAssert.that(actual).containsInAnyOrder(expected); + pipeline.run().waitUntilFinish(); + } + + // Test cases for Reader's arguments. + @Test + public void testMissingScanRequestFn() { + thrown.expectMessage("withScanRequestFn() is required"); + pipeline.apply( + DynamoDBIO.read() + .withAwsClientsProvider( + AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))); + try { + pipeline.run().waitUntilFinish(); + fail("withScanRequestFn() is required"); + } catch (IllegalArgumentException ex) { + assertEquals("withScanRequestFn() is required", ex.getMessage()); + } + } + + @Test + public void testMissingAwsClientsProvider() { + thrown.expectMessage("withAwsClientsProvider() is required"); + pipeline.apply( + DynamoDBIO.read() + .withScanRequestFn( + (SerializableFunction<Void, ScanRequest>) + input -> new ScanRequest(tableName).withTotalSegments(3))); + try { + pipeline.run().waitUntilFinish(); + fail("withAwsClientsProvider() is required"); + } catch (IllegalArgumentException ex) { + assertEquals("withAwsClientsProvider() is required", ex.getMessage()); + } + } + + @Test + public void testMissingTotalSegments() { + thrown.expectMessage("TotalSegments is required with withScanRequestFn()"); + pipeline.apply( + DynamoDBIO.read() + .withScanRequestFn( + (SerializableFunction<Void, ScanRequest>) input -> new ScanRequest(tableName)) + .withAwsClientsProvider( + AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))); + try { + pipeline.run().waitUntilFinish(); + fail("TotalSegments is required with withScanRequestFn()"); + } catch (IllegalArgumentException ex) { + assertEquals("TotalSegments is required with withScanRequestFn()", ex.getMessage()); + } + } + + @Test + public void testNegativeTotalSegments() { + thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero"); + pipeline.apply( + DynamoDBIO.read() + .withScanRequestFn( + (SerializableFunction<Void, ScanRequest>) + input -> new ScanRequest(tableName).withTotalSegments(-1)) + .withAwsClientsProvider( + AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))); + try { + pipeline.run().waitUntilFinish(); + fail("withTotalSegments() is expected and greater than zero"); + } catch (IllegalArgumentException ex) { + assertEquals( + "TotalSegments is required with withScanRequestFn() and greater zero", ex.getMessage()); + } + } + + // Test cases for Writer. + @Test + public void testWriteDataToDynamo() { + final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems); + + final PCollection<Void> output = + pipeline + .apply(Create.of(writeRequests)) + .apply( + DynamoDBIO.<WriteRequest>write() + .withWriteRequestMapperFn( + (SerializableFunction<WriteRequest, KV<String, WriteRequest>>) + writeRequest -> KV.of(tableName, writeRequest)) + .withRetryConfiguration( + DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1))) + .withAwsClientsProvider( + AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))); + + final PCollection<Long> publishedResultsSize = output.apply(Count.globally()); + PAssert.that(publishedResultsSize).containsInAnyOrder(0L); + + pipeline.run().waitUntilFinish(); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testRetries() throws Throwable { + thrown.expectMessage("Error writing to DynamoDB"); + + final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems); + + AmazonDynamoDB amazonDynamoDBMock = Mockito.mock(AmazonDynamoDB.class); + Mockito.when(amazonDynamoDBMock.batchWriteItem(Mockito.any(BatchWriteItemRequest.class))) + .thenThrow(new AmazonDynamoDBException("Service unavailable")); + + pipeline + .apply(Create.of(writeRequests)) + .apply( + DynamoDBIO.<WriteRequest>write() + .withWriteRequestMapperFn( + (SerializableFunction<WriteRequest, KV<String, WriteRequest>>) + writeRequest -> KV.of(tableName, writeRequest)) + .withRetryConfiguration( + DynamoDBIO.RetryConfiguration.create(4, Duration.standardSeconds(10))) + .withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock))); + + try { + pipeline.run().waitUntilFinish(); + } catch (final Pipeline.PipelineExecutionException e) { + // check 3 retries were initiated by inspecting the log before passing on the exception + expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1)); + expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 2)); + expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 3)); + throw e.getCause(); + } + fail("Pipeline is expected to fail because we were unable to write to DynamoDB."); + } +} diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java new file mode 100644 index 0000000..6043994 --- /dev/null +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTestHelper.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws.dynamodb; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.CreateTableResult; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ListTablesResult; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Rule; +import org.testcontainers.containers.localstack.LocalStackContainer; + +/** A utility to generate test table and data for {@link DynamoDBIOTest}. */ +class DynamoDBIOTestHelper implements Serializable { + + @Rule + private static LocalStackContainer localStackContainer = + new LocalStackContainer().withServices(LocalStackContainer.Service.DYNAMODB); + + private static AmazonDynamoDB dynamoDBClient; + + static final String ATTR_NAME_1 = "hashKey1"; + static final String ATTR_NAME_2 = "rangeKey2"; + + static void startServerClient() { + localStackContainer.start(); + + if (dynamoDBClient == null) { + dynamoDBClient = + AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration( + localStackContainer.getEndpointConfiguration( + LocalStackContainer.Service.DYNAMODB)) + .withCredentials(localStackContainer.getDefaultCredentialsProvider()) + .build(); + } + } + + static void stopServerClient(String tableName) { + if (dynamoDBClient != null) { + dynamoDBClient.deleteTable(tableName); + dynamoDBClient.shutdown(); + } + localStackContainer.stop(); + } + + static AmazonDynamoDB getDynamoDBClient() { + // Note: each test case got to have their own dynamo client obj, can't be shared + // Otherwise will run into connection pool issue + return AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration( + localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.DYNAMODB)) + .withCredentials(localStackContainer.getDefaultCredentialsProvider()) + .build(); + } + + static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) { + BatchWriteItemRequest batchWriteItemRequest = + generateBatchWriteItemRequest(tableName, numOfItems); + + dynamoDBClient.batchWriteItem(batchWriteItemRequest); + ScanResult scanResult = dynamoDBClient.scan(new ScanRequest().withTableName(tableName)); + + List<Map<String, AttributeValue>> items = scanResult.getItems(); + Assert.assertEquals(numOfItems, items.size()); + return items; + } + + static BatchWriteItemRequest generateBatchWriteItemRequest(String tableName, int numOfItems) { + BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest(); + batchWriteItemRequest.addRequestItemsEntry(tableName, generateWriteRequests(numOfItems)); + return batchWriteItemRequest; + } + + static List<WriteRequest> generateWriteRequests(int numOfItem) { + List<WriteRequest> writeRequests = new ArrayList<>(); + for (int i = 1; i <= numOfItem; i++) { + WriteRequest writeRequest = new WriteRequest(); + writeRequest.setPutRequest(generatePutRequest("hashKeyDataStr_" + i, "1000" + i)); + writeRequests.add(writeRequest); + } + return writeRequests; + } + + private static PutRequest generatePutRequest(String hashKeyData, String rangeKeyData) { + PutRequest putRequest = new PutRequest(); + putRequest.addItemEntry(ATTR_NAME_1, new AttributeValue(hashKeyData)); + putRequest.addItemEntry(ATTR_NAME_2, new AttributeValue().withN(rangeKeyData)); + return putRequest; + } + + static void createTestTable(String tableName) { + CreateTableResult res = createDynamoTable(tableName); + + TableDescription tableDesc = res.getTableDescription(); + + Assert.assertEquals(tableName, tableDesc.getTableName()); + Assert.assertTrue(tableDesc.getKeySchema().toString().contains(ATTR_NAME_1)); + Assert.assertTrue(tableDesc.getKeySchema().toString().contains(ATTR_NAME_2)); + + Assert.assertEquals( + tableDesc.getProvisionedThroughput().getReadCapacityUnits(), Long.valueOf(1000)); + Assert.assertEquals( + tableDesc.getProvisionedThroughput().getWriteCapacityUnits(), Long.valueOf(1000)); + Assert.assertEquals("ACTIVE", tableDesc.getTableStatus()); + Assert.assertEquals( + "arn:aws:dynamodb:us-east-1:000000000000:table/" + tableName, tableDesc.getTableArn()); + + ListTablesResult tables = dynamoDBClient.listTables(); + Assert.assertEquals(1, tables.getTableNames().size()); + } + + private static CreateTableResult createDynamoTable(String tableName) { + + ImmutableList<AttributeDefinition> attributeDefinitions = + ImmutableList.of( + new AttributeDefinition(ATTR_NAME_1, ScalarAttributeType.S), + new AttributeDefinition(ATTR_NAME_2, ScalarAttributeType.N)); + + ImmutableList<KeySchemaElement> ks = + ImmutableList.of( + new KeySchemaElement(ATTR_NAME_1, KeyType.HASH), + new KeySchemaElement(ATTR_NAME_2, KeyType.RANGE)); + + ProvisionedThroughput provisionedthroughput = new ProvisionedThroughput(1000L, 1000L); + CreateTableRequest request = + new CreateTableRequest() + .withTableName(tableName) + .withAttributeDefinitions(attributeDefinitions) + .withKeySchema(ks) + .withProvisionedThroughput(provisionedthroughput); + + return dynamoDBClient.createTable(request); + } +}