[ 
https://issues.apache.org/jira/browse/BEAM-4823?focusedWorklogId=137588&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137588
 ]

ASF GitHub Bot logged work on BEAM-4823:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Aug/18 21:37
            Start Date: 23/Aug/18 21:37
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6202: [BEAM-4823] - Adds a 
Sink to write to Amazon's SNS
URL: https://github.com/apache/beam/pull/6202
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/amazon-web-services/build.gradle 
b/sdks/java/io/amazon-web-services/build.gradle
index 3ea755966cd..23ed8b65e43 100644
--- a/sdks/java/io/amazon-web-services/build.gradle
+++ b/sdks/java/io/amazon-web-services/build.gradle
@@ -30,8 +30,11 @@ dependencies {
   compile library.java.guava
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow "com.amazonaws:aws-java-sdk-core:$aws_java_sdk_version"
+  shadow "com.amazonaws:aws-java-sdk-cloudwatch:$aws_java_sdk_version"
   shadow "com.amazonaws:aws-java-sdk-s3:$aws_java_sdk_version"
   shadow "com.amazonaws:aws-java-sdk-sqs:$aws_java_sdk_version"
+  shadow "com.amazonaws:aws-java-sdk-sns:$aws_java_sdk_version"
+  shadow "commons-lang:commons-lang:2.6"
   shadow library.java.jackson_core
   shadow library.java.jackson_annotations
   shadow library.java.jackson_databind
@@ -39,6 +42,8 @@ dependencies {
   runtime 'commons-codec:commons-codec:1.9'
   runtime "org.apache.httpcomponents:httpclient:4.5.6"
   testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
+  testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  testCompile project(path: ":beam-sdks-java-io-common", configuration: 
"shadow")
   shadowTest library.java.guava_testlib
   shadowTest library.java.hamcrest_core
   shadowTest library.java.hamcrest_library
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/AwsClientsProvider.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/AwsClientsProvider.java
new file mode 100644
index 00000000000..dd11daa6431
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/AwsClientsProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.sns.AmazonSNS;
+import java.io.Serializable;
+
+/**
+ * Provides instances of AWS clients.
+ *
+ * <p>Please note, that any instance of {@link AwsClientsProvider} must be 
{@link Serializable} to
+ * ensure it can be sent to worker machines.
+ */
+public interface AwsClientsProvider extends Serializable {
+  AmazonCloudWatch getCloudWatchClient();
+
+  AmazonSNS createSnsPublisher();
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/BasicSnsProvider.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/BasicSnsProvider.java
new file mode 100644
index 00000000000..c2481fc8bcc
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/BasicSnsProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.sns;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.AmazonSNSClientBuilder;
+import javax.annotation.Nullable;
+
+/** Basic implementation of {@link AwsClientsProvider} used by default in 
{@link SnsIO}. */
+class BasicSnsProvider implements AwsClientsProvider {
+
+  private final String accessKey;
+  private final String secretKey;
+  private final Regions region;
+  @Nullable private final String serviceEndpoint;
+
+  BasicSnsProvider(
+      String accessKey, String secretKey, Regions region, @Nullable String 
serviceEndpoint) {
+    checkArgument(accessKey != null, "accessKey can not be null");
+    checkArgument(secretKey != null, "secretKey can not be null");
+    checkArgument(region != null, "region can not be null");
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+    this.region = region;
+    this.serviceEndpoint = serviceEndpoint;
+  }
+
+  private AWSCredentialsProvider getCredentialsProvider() {
+    return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, 
secretKey));
+  }
+
+  @Override
+  public AmazonCloudWatch getCloudWatchClient() {
+    AmazonCloudWatchClientBuilder clientBuilder =
+        
AmazonCloudWatchClientBuilder.standard().withCredentials(getCredentialsProvider());
+    if (serviceEndpoint == null) {
+      clientBuilder.withRegion(region);
+    } else {
+      clientBuilder.withEndpointConfiguration(
+          new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, 
region.getName()));
+    }
+    return clientBuilder.build();
+  }
+
+  @Override
+  public AmazonSNS createSnsPublisher() {
+    return AmazonSNSClientBuilder.standard()
+        .withCredentials(getCredentialsProvider())
+        .withRegion(region)
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
new file mode 100644
index 00000000000..a49cf073f44
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/PublishResultCoder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.services.sns.model.PublishResult;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/** Custom Coder for handling publish result. */
+public class PublishResultCoder extends Coder<PublishResult> implements 
Serializable {
+  private static final PublishResultCoder INSTANCE = new PublishResultCoder();
+
+  private PublishResultCoder() {}
+
+  static PublishResultCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(PublishResult value, OutputStream outStream)
+      throws CoderException, IOException {
+    StringUtf8Coder.of().encode(value.getMessageId(), outStream);
+  }
+
+  @Override
+  public PublishResult decode(InputStream inStream) throws CoderException, 
IOException {
+    final String messageId = StringUtf8Coder.of().decode(inStream);
+    return new PublishResult().withMessageId(messageId);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    StringUtf8Coder.of().verifyDeterministic();
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsCoderProviderRegistrar.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsCoderProviderRegistrar.java
new file mode 100644
index 00000000000..bccf828dc1a
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsCoderProviderRegistrar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.services.sns.model.PublishResult;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A {@link CoderProviderRegistrar} for standard types used with {@link 
SnsIO}. */
+@AutoService(CoderProviderRegistrar.class)
+public class SnsCoderProviderRegistrar implements CoderProviderRegistrar {
+  @Override
+  public List<CoderProvider> getCoderProviders() {
+    return ImmutableList.of(
+        CoderProviders.forCoder(TypeDescriptor.of(PublishResult.class), 
PublishResultCoder.of()));
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
new file mode 100644
index 00000000000..04ad120d004
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.sns;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.model.GetTopicAttributesResult;
+import com.amazonaws.services.sns.model.InternalErrorException;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.http.HttpStatus;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a 
href="https://aws.amazon.com/sns/";>SNS</a>.
+ *
+ * <h3>Writing to SNS</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<PublishRequest> data = ...;
+ *
+ * data.apply(SnsIO.write()
+ *     .withTopicName("topicName")
+ *     .withRetryConfiguration(
+ *        SnsIO.RetryConfiguration.create(
+ *          4, org.joda.time.Duration.standardSeconds(10)))
+ *     .withAWSClientsProvider(new BasicSnsProvider(accessKey, secretKey, 
region))
+ *     .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>name of the SNS topic you're going to write to
+ *   <li>retry configuration
+ *   <li>need to specify AwsClientsProvider. You can pass on the default one 
BasicSnsProvider
+ *   <li>an output tag where you can get results. Example in SnsIOTest
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class SnsIO {
+
+  //Write data tp SNS
+  public static Write write() {
+    return new AutoValue_SnsIO_Write.Builder().build();
+  }
+
+  /**
+   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to SNS. A retry
+   * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
+   * any of the following exceptions:
+   *
+   * <ul>
+   *   <li>{@link IOException}
+   * </ul>
+   */
+  @AutoValue
+  public abstract static class RetryConfiguration implements Serializable {
+    @VisibleForTesting
+    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
+
+    abstract int getMaxAttempts();
+
+    abstract Duration getMaxDuration();
+
+    abstract RetryPredicate getRetryPredicate();
+
+    abstract Builder builder();
+
+    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
+      checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
+      checkArgument(
+          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+          "maxDuration should be greater than 0");
+      return new AutoValue_SnsIO_RetryConfiguration.Builder()
+          .setMaxAttempts(maxAttempts)
+          .setMaxDuration(maxDuration)
+          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+          .build();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract SnsIO.RetryConfiguration.Builder setMaxAttempts(int 
maxAttempts);
+
+      abstract SnsIO.RetryConfiguration.Builder setMaxDuration(Duration 
maxDuration);
+
+      abstract SnsIO.RetryConfiguration.Builder 
setRetryPredicate(RetryPredicate retryPredicate);
+
+      abstract SnsIO.RetryConfiguration build();
+    }
+
+    /**
+     * An interface used to control if we retry the SNS Publish call when a 
{@link Throwable}
+     * occurs. If {@link RetryPredicate#test(Object)} returns true, {@link 
Write} tries to resend
+     * the requests to the Solr server if the {@link RetryConfiguration} 
permits it.
+     */
+    @FunctionalInterface
+    interface RetryPredicate extends Predicate<Throwable>, Serializable {}
+
+    private static class DefaultRetryPredicate implements RetryPredicate {
+      private static final ImmutableSet<Integer> ELIGIBLE_CODES =
+          ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
+
+      @Override
+      public boolean test(Throwable throwable) {
+        return (throwable instanceof IOException
+            || (throwable instanceof InternalErrorException)
+            || (throwable instanceof InternalErrorException
+                && ELIGIBLE_CODES.contains(((InternalErrorException) 
throwable).getStatusCode())));
+      }
+    }
+  }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write
+      extends PTransform<PCollection<PublishRequest>, PCollectionTuple> {
+    @Nullable
+    abstract String getTopicName();
+
+    @Nullable
+    abstract AwsClientsProvider getAWSClientsProvider();
+
+    @Nullable
+    abstract RetryConfiguration getRetryConfiguration();
+
+    @Nullable
+    abstract TupleTag<PublishResult> getResultOutputTag();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setTopicName(String topicName);
+
+      abstract Builder setAWSClientsProvider(AwsClientsProvider 
clientProvider);
+
+      abstract Builder setRetryConfiguration(RetryConfiguration 
retryConfiguration);
+
+      abstract Builder setResultOutputTag(TupleTag<PublishResult> results);
+
+      abstract Write build();
+    }
+
+    /**
+     * Specify the SNS topic which will be used for writing, this name is 
mandatory.
+     *
+     * @param topicName topicName
+     */
+    public Write withTopicName(String topicName) {
+      return builder().setTopicName(topicName).build();
+    }
+
+    /**
+     * Allows to specify custom {@link AwsClientsProvider}. {@link 
AwsClientsProvider} creates new
+     * {@link AmazonSNS} which is later used for writing to a SNS topic.
+     */
+    public Write withAWSClientsProvider(AwsClientsProvider awsClientsProvider) 
{
+      return builder().setAWSClientsProvider(awsClientsProvider).build();
+    }
+
+    /**
+     * Specify credential details and region to be used to write to SNS. If 
you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Write#withAWSClientsProvider(AwsClientsProvider)}.
+     */
+    public Write withAWSClientsProvider(String awsAccessKey, String 
awsSecretKey, Regions region) {
+      return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+    }
+
+    /**
+     * Specify credential details and region to be used to write to SNS. If 
you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * Write#withAWSClientsProvider(AwsClientsProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
+     * the tests with Kinesis service emulator.
+     */
+    public Write withAWSClientsProvider(
+        String awsAccessKey, String awsSecretKey, Regions region, String 
serviceEndpoint) {
+      return withAWSClientsProvider(
+          new BasicSnsProvider(awsAccessKey, awsSecretKey, region, 
serviceEndpoint));
+    }
+
+    /**
+     * Provides configuration to retry a failed request to publish a message 
to SNS. Users should
+     * consider that retrying might compound the underlying problem which 
caused the initial
+     * failure. Users should also be aware that once retrying is exhausted the 
error is surfaced to
+     * the runner which <em>may</em> then opt to retry the current partition 
in entirety or abort if
+     * the max number of retries of the runner is completed. Retrying uses an 
exponential backoff
+     * algorithm, with minimum backoff of 5 seconds and then surfacing the 
error once the maximum
+     * number of retries or maximum configuration duration is exceeded.
+     *
+     * <p>Example use:
+     *
+     * <pre>{@code
+     * SnsIO.write()
+     *   .withRetryConfiguration(SnsIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1))
+     *   ...
+     * }</pre>
+     *
+     * @param retryConfiguration the rules which govern the retry behavior
+     * @return the {@link Write} with retrying configured
+     */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
+      return builder().setRetryConfiguration(retryConfiguration).build();
+    }
+
+    /** Tuple tag to store results. Mandatory field. */
+    public Write withResultOutputTag(TupleTag<PublishResult> results) {
+      return builder().setResultOutputTag(results).build();
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<PublishRequest> input) {
+      checkArgument(getTopicName() != null, "withTopicName() is required");
+      return input.apply(
+          ParDo.of(new SnsWriterFn(this))
+              .withOutputTags(getResultOutputTag(), TupleTagList.empty()));
+    }
+
+    static class SnsWriterFn extends DoFn<PublishRequest, PublishResult> {
+      @VisibleForTesting
+      static final String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry 
attempt[%d]";
+
+      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
+      private transient FluentBackoff retryBackoff; // defaults to no retries
+      private static final Logger LOG = 
LoggerFactory.getLogger(SnsWriterFn.class);
+      private static final Counter SNS_WRITE_FAILURES =
+          Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
+
+      private final SnsIO.Write spec;
+      private transient AmazonSNS producer;
+
+      SnsWriterFn(SnsIO.Write spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        //Initialize SnsPublisher
+        producer = spec.getAWSClientsProvider().createSnsPublisher();
+        checkArgument(
+            topicExists(producer, spec.getTopicName()),
+            "Topic %s does not exist",
+            spec.getTopicName());
+
+        retryBackoff =
+            FluentBackoff.DEFAULT
+                .withMaxRetries(0) // default to no retrying
+                .withInitialBackoff(RETRY_INITIAL_BACKOFF);
+        if (spec.getRetryConfiguration() != null) {
+          retryBackoff =
+              retryBackoff
+                  
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
+                  
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        PublishRequest request = context.element();
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = retryBackoff.backoff();
+        int attempt = 0;
+        while (true) {
+          attempt++;
+          try {
+            PublishResult pr = producer.publish(request);
+            context.output(pr);
+            break;
+          } catch (Exception ex) {
+            // Fail right away if there is no retry configuration
+            if (spec.getRetryConfiguration() == null
+                || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) 
{
+              SNS_WRITE_FAILURES.inc();
+              LOG.info("Unable to publish message {} due to {} ", 
request.getMessage(), ex);
+              throw new IOException("Error writing to SNS (no attempt made to 
retry)", ex);
+            }
+
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              throw new IOException(
+                  String.format(
+                      "Error writing to SNS after %d attempt(s). No more 
attempts allowed",
+                      attempt),
+                  ex);
+            } else {
+              // Note: this used in test cases to verify behavior
+              LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
+            }
+          }
+        }
+      }
+
+      @Teardown
+      public void tearDown() {
+        if (producer != null) {
+          producer.shutdown();
+          producer = null;
+        }
+      }
+
+      @SuppressWarnings({"checkstyle:illegalCatch"})
+      private static boolean topicExists(AmazonSNS client, String topicName) {
+        try {
+          GetTopicAttributesResult topicAttributesResult = 
client.getTopicAttributes(topicName);
+          return topicAttributesResult != null
+              && 
topicAttributesResult.getSdkHttpMetadata().getHttpStatusCode() == 200;
+        } catch (Exception e) {
+          LOG.warn("Error checking whether topic {} exists.", topicName, e);
+          throw e;
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/package-info.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/package-info.java
new file mode 100644
index 00000000000..a1895cf4ce6
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Defines IO connectors for Amazon Web Services SNS. */
+package org.apache.beam.sdk.io.aws.sns;
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleAmazonSNSMock.java
similarity index 99%
rename from 
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
rename to 
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleAmazonSNSMock.java
index 04055f693a0..c7c9e571383 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleAmazonSNSMock.java
@@ -47,7 +47,7 @@
 
 /** Tests {@link AwsModule}. */
 @RunWith(JUnit4.class)
-public class AwsModuleTest {
+public class AwsModuleAmazonSNSMock {
 
   private final ObjectMapper objectMapper = new 
ObjectMapper().registerModule(new AwsModule());
 
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index 0abf2170a36..d34520c45a0 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -75,7 +75,7 @@
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentMatcher;
 
-/** Test case for {@link S3FileSystem}. */
+/** AmazonSNSMock case for {@link S3FileSystem}. */
 @RunWith(JUnit4.class)
 public class S3FileSystemTest {
   private static S3Mock api;
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMock.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMock.java
new file mode 100644
index 00000000000..3431e3a9127
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMock.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.model.AddPermissionRequest;
+import com.amazonaws.services.sns.model.AddPermissionResult;
+import com.amazonaws.services.sns.model.CheckIfPhoneNumberIsOptedOutRequest;
+import com.amazonaws.services.sns.model.CheckIfPhoneNumberIsOptedOutResult;
+import com.amazonaws.services.sns.model.ConfirmSubscriptionRequest;
+import com.amazonaws.services.sns.model.ConfirmSubscriptionResult;
+import com.amazonaws.services.sns.model.CreatePlatformApplicationRequest;
+import com.amazonaws.services.sns.model.CreatePlatformApplicationResult;
+import com.amazonaws.services.sns.model.CreatePlatformEndpointRequest;
+import com.amazonaws.services.sns.model.CreatePlatformEndpointResult;
+import com.amazonaws.services.sns.model.CreateTopicRequest;
+import com.amazonaws.services.sns.model.CreateTopicResult;
+import com.amazonaws.services.sns.model.DeleteEndpointRequest;
+import com.amazonaws.services.sns.model.DeleteEndpointResult;
+import com.amazonaws.services.sns.model.DeletePlatformApplicationRequest;
+import com.amazonaws.services.sns.model.DeletePlatformApplicationResult;
+import com.amazonaws.services.sns.model.DeleteTopicRequest;
+import com.amazonaws.services.sns.model.DeleteTopicResult;
+import com.amazonaws.services.sns.model.GetEndpointAttributesRequest;
+import com.amazonaws.services.sns.model.GetEndpointAttributesResult;
+import 
com.amazonaws.services.sns.model.GetPlatformApplicationAttributesRequest;
+import com.amazonaws.services.sns.model.GetPlatformApplicationAttributesResult;
+import com.amazonaws.services.sns.model.GetSMSAttributesRequest;
+import com.amazonaws.services.sns.model.GetSMSAttributesResult;
+import com.amazonaws.services.sns.model.GetSubscriptionAttributesRequest;
+import com.amazonaws.services.sns.model.GetSubscriptionAttributesResult;
+import com.amazonaws.services.sns.model.GetTopicAttributesRequest;
+import com.amazonaws.services.sns.model.GetTopicAttributesResult;
+import 
com.amazonaws.services.sns.model.ListEndpointsByPlatformApplicationRequest;
+import 
com.amazonaws.services.sns.model.ListEndpointsByPlatformApplicationResult;
+import com.amazonaws.services.sns.model.ListPhoneNumbersOptedOutRequest;
+import com.amazonaws.services.sns.model.ListPhoneNumbersOptedOutResult;
+import com.amazonaws.services.sns.model.ListPlatformApplicationsRequest;
+import com.amazonaws.services.sns.model.ListPlatformApplicationsResult;
+import com.amazonaws.services.sns.model.ListSubscriptionsByTopicRequest;
+import com.amazonaws.services.sns.model.ListSubscriptionsByTopicResult;
+import com.amazonaws.services.sns.model.ListSubscriptionsRequest;
+import com.amazonaws.services.sns.model.ListSubscriptionsResult;
+import com.amazonaws.services.sns.model.ListTopicsRequest;
+import com.amazonaws.services.sns.model.ListTopicsResult;
+import com.amazonaws.services.sns.model.OptInPhoneNumberRequest;
+import com.amazonaws.services.sns.model.OptInPhoneNumberResult;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.RemovePermissionRequest;
+import com.amazonaws.services.sns.model.RemovePermissionResult;
+import com.amazonaws.services.sns.model.SetEndpointAttributesRequest;
+import com.amazonaws.services.sns.model.SetEndpointAttributesResult;
+import 
com.amazonaws.services.sns.model.SetPlatformApplicationAttributesRequest;
+import com.amazonaws.services.sns.model.SetPlatformApplicationAttributesResult;
+import com.amazonaws.services.sns.model.SetSMSAttributesRequest;
+import com.amazonaws.services.sns.model.SetSMSAttributesResult;
+import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest;
+import com.amazonaws.services.sns.model.SetSubscriptionAttributesResult;
+import com.amazonaws.services.sns.model.SetTopicAttributesRequest;
+import com.amazonaws.services.sns.model.SetTopicAttributesResult;
+import com.amazonaws.services.sns.model.SubscribeRequest;
+import com.amazonaws.services.sns.model.SubscribeResult;
+import com.amazonaws.services.sns.model.UnsubscribeRequest;
+import com.amazonaws.services.sns.model.UnsubscribeResult;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import org.mockito.Mockito;
+
+/** Mock class to test amazon sns service. */
+public abstract class AmazonSNSMock implements AmazonSNS, Serializable {
+
+  public AmazonSNSMock() {}
+
+  @Override
+  public void setEndpoint(String endpoint) {}
+
+  @Override
+  public void setRegion(Region region) {}
+
+  @Override
+  public AddPermissionResult addPermission(AddPermissionRequest 
addPermissionRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public AddPermissionResult addPermission(
+      String topicArn, String label, List<String> aWSAccountIds, List<String> 
actionNames) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CheckIfPhoneNumberIsOptedOutResult checkIfPhoneNumberIsOptedOut(
+      CheckIfPhoneNumberIsOptedOutRequest checkIfPhoneNumberIsOptedOutRequest) 
{
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ConfirmSubscriptionResult confirmSubscription(
+      ConfirmSubscriptionRequest confirmSubscriptionRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ConfirmSubscriptionResult confirmSubscription(
+      String topicArn, String token, String authenticateOnUnsubscribe) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ConfirmSubscriptionResult confirmSubscription(String topicArn, String 
token) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreatePlatformApplicationResult createPlatformApplication(
+      CreatePlatformApplicationRequest createPlatformApplicationRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreatePlatformEndpointResult createPlatformEndpoint(
+      CreatePlatformEndpointRequest createPlatformEndpointRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateTopicResult createTopic(CreateTopicRequest createTopicRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateTopicResult createTopic(String name) {
+    return new CreateTopicResult().withTopicArn(name);
+  }
+
+  @Override
+  public DeleteEndpointResult deleteEndpoint(DeleteEndpointRequest 
deleteEndpointRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeletePlatformApplicationResult deletePlatformApplication(
+      DeletePlatformApplicationRequest deletePlatformApplicationRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteTopicResult deleteTopic(DeleteTopicRequest deleteTopicRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteTopicResult deleteTopic(String topicArn) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetEndpointAttributesResult getEndpointAttributes(
+      GetEndpointAttributesRequest getEndpointAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetPlatformApplicationAttributesResult 
getPlatformApplicationAttributes(
+      GetPlatformApplicationAttributesRequest 
getPlatformApplicationAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetSMSAttributesResult getSMSAttributes(GetSMSAttributesRequest 
getSMSAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetSubscriptionAttributesResult getSubscriptionAttributes(
+      GetSubscriptionAttributesRequest getSubscriptionAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetSubscriptionAttributesResult getSubscriptionAttributes(String 
subscriptionArn) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetTopicAttributesResult getTopicAttributes(
+      GetTopicAttributesRequest getTopicAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetTopicAttributesResult getTopicAttributes(String topicArn) {
+    GetTopicAttributesResult result = 
Mockito.mock(GetTopicAttributesResult.class);
+    SdkHttpMetadata metadata = Mockito.mock(SdkHttpMetadata.class);
+    Mockito.when(metadata.getHttpHeaders()).thenReturn(new HashMap<>());
+    Mockito.when(metadata.getHttpStatusCode()).thenReturn(200);
+    Mockito.when(result.getSdkHttpMetadata()).thenReturn(metadata);
+    return result;
+  }
+
+  @Override
+  public ListEndpointsByPlatformApplicationResult 
listEndpointsByPlatformApplication(
+      ListEndpointsByPlatformApplicationRequest 
listEndpointsByPlatformApplicationRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListPhoneNumbersOptedOutResult listPhoneNumbersOptedOut(
+      ListPhoneNumbersOptedOutRequest listPhoneNumbersOptedOutRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListPlatformApplicationsResult listPlatformApplications(
+      ListPlatformApplicationsRequest listPlatformApplicationsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListPlatformApplicationsResult listPlatformApplications() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsResult listSubscriptions(
+      ListSubscriptionsRequest listSubscriptionsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsResult listSubscriptions() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsResult listSubscriptions(String nextToken) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsByTopicResult listSubscriptionsByTopic(
+      ListSubscriptionsByTopicRequest listSubscriptionsByTopicRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsByTopicResult listSubscriptionsByTopic(String 
topicArn) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListSubscriptionsByTopicResult listSubscriptionsByTopic(
+      String topicArn, String nextToken) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListTopicsResult listTopics(ListTopicsRequest listTopicsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListTopicsResult listTopics() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListTopicsResult listTopics(String nextToken) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public OptInPhoneNumberResult optInPhoneNumber(OptInPhoneNumberRequest 
optInPhoneNumberRequest) {
+    return null;
+  }
+
+  @Override
+  public PublishResult publish(String topicArn, String message) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PublishResult publish(String topicArn, String message, String 
subject) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public RemovePermissionResult removePermission(RemovePermissionRequest 
removePermissionRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public RemovePermissionResult removePermission(String topicArn, String 
label) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetEndpointAttributesResult setEndpointAttributes(
+      SetEndpointAttributesRequest setEndpointAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetPlatformApplicationAttributesResult 
setPlatformApplicationAttributes(
+      SetPlatformApplicationAttributesRequest 
setPlatformApplicationAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetSMSAttributesResult setSMSAttributes(SetSMSAttributesRequest 
setSMSAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetSubscriptionAttributesResult setSubscriptionAttributes(
+      SetSubscriptionAttributesRequest setSubscriptionAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetSubscriptionAttributesResult setSubscriptionAttributes(
+      String subscriptionArn, String attributeName, String attributeValue) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetTopicAttributesResult setTopicAttributes(
+      SetTopicAttributesRequest setTopicAttributesRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SetTopicAttributesResult setTopicAttributes(
+      String topicArn, String attributeName, String attributeValue) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SubscribeResult subscribe(SubscribeRequest subscribeRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SubscribeResult subscribe(String topicArn, String protocol, String 
endpoint) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public UnsubscribeResult unsubscribe(UnsubscribeRequest unsubscribeRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public UnsubscribeResult unsubscribe(String subscriptionArn) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public void shutdown() {}
+
+  @Override
+  public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest 
request) {
+    throw new RuntimeException("Not implemented");
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockErrors.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockErrors.java
new file mode 100644
index 00000000000..8c6b4981dca
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockErrors.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.services.sns.model.InternalErrorException;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+
+/** Mock class to test a failed publish of a msg. */
+public class AmazonSNSMockErrors extends AmazonSNSMock {
+  @Override
+  public PublishResult publish(PublishRequest publishRequest) {
+    throw new InternalErrorException("Service unavailable");
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockSuccess.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockSuccess.java
new file mode 100644
index 00000000000..c4f51c0cb7a
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/AmazonSNSMockSuccess.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.sns;
+
+import com.amazonaws.http.SdkHttpMetadata;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import java.util.HashMap;
+import java.util.UUID;
+import org.mockito.Mockito;
+
+/** Mock class to test a successful publish of a msg. */
+public class AmazonSNSMockSuccess extends AmazonSNSMock {
+  @Override
+  public PublishResult publish(PublishRequest publishRequest) {
+    PublishResult result = Mockito.mock(PublishResult.class);
+    SdkHttpMetadata metadata = Mockito.mock(SdkHttpMetadata.class);
+    Mockito.when(metadata.getHttpHeaders()).thenReturn(new HashMap<>());
+    Mockito.when(metadata.getHttpStatusCode()).thenReturn(200);
+    Mockito.when(result.getSdkHttpMetadata()).thenReturn(metadata);
+    
Mockito.when(result.getMessageId()).thenReturn(UUID.randomUUID().toString());
+    return result;
+  }
+}
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/SnsIOTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/SnsIOTest.java
new file mode 100644
index 00000000000..338abc65cf1
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/sns/SnsIOTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.aws.sns;
+
+import static org.junit.Assert.fail;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests to verify writes to Sns. */
+@RunWith(JUnit4.class)
+public class SnsIOTest implements Serializable {
+
+  private static final String topicName = 
"arn:aws:sns:us-west-2:5880:topic-FMFEHJ47NRFO";
+
+  @Rule public TestPipeline p = TestPipeline.create();
+  @Rule public final transient ExpectedLogs expectedLogs = 
ExpectedLogs.none(SnsIO.class);
+
+  private static PublishRequest createSampleMessage(String message) {
+    PublishRequest request = new 
PublishRequest().withTopicArn(topicName).withMessage(message);
+    return request;
+  }
+
+  private static class Provider implements AwsClientsProvider {
+
+    private static AmazonSNS publisher;
+
+    public Provider(AmazonSNS pub) {
+      publisher = pub;
+    }
+
+    @Override
+    public AmazonCloudWatch getCloudWatchClient() {
+      return Mockito.mock(AmazonCloudWatch.class);
+    }
+
+    @Override
+    public AmazonSNS createSnsPublisher() {
+      return publisher;
+    }
+  }
+
+  @Test
+  public void testDataWritesToSNS() {
+    final PublishRequest request1 = createSampleMessage("my_first_message");
+    final PublishRequest request2 = createSampleMessage("my_second_message");
+
+    final TupleTag<PublishResult> results = new TupleTag<>();
+
+    final PCollectionTuple snsWrites =
+        p.apply(Create.of(request1, request2))
+            .apply(
+                SnsIO.write()
+                    .withTopicName(topicName)
+                    .withRetryConfiguration(
+                        SnsIO.RetryConfiguration.create(
+                            5, org.joda.time.Duration.standardMinutes(1)))
+                    .withAWSClientsProvider(new Provider(new 
AmazonSNSMockSuccess()))
+                    .withResultOutputTag(results));
+
+    final PCollection<Long> publishedResultsSize = 
snsWrites.get(results).apply(Count.globally());
+    
PAssert.that(publishedResultsSize).containsInAnyOrder(ImmutableList.of(2L));
+    p.run().waitUntilFinish();
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testRetries() throws Throwable {
+    thrown.expectMessage("Error writing to SNS");
+    final PublishRequest request1 = createSampleMessage("my message that will 
not be published");
+    final TupleTag<PublishResult> results = new TupleTag<>();
+    p.apply(Create.of(request1))
+        .apply(
+            SnsIO.write()
+                .withTopicName(topicName)
+                .withRetryConfiguration(
+                    SnsIO.RetryConfiguration.create(4, 
org.joda.time.Duration.standardSeconds(10)))
+                .withAWSClientsProvider(new Provider(new 
AmazonSNSMockErrors()))
+                .withResultOutputTag(results));
+
+    try {
+      p.run();
+    } catch (final Pipeline.PipelineExecutionException e) {
+      // check 3 retries were initiated by inspecting the log before passing 
on the exception
+      
expectedLogs.verifyWarn(String.format(SnsIO.Write.SnsWriterFn.RETRY_ATTEMPT_LOG,
 1));
+      
expectedLogs.verifyWarn(String.format(SnsIO.Write.SnsWriterFn.RETRY_ATTEMPT_LOG,
 2));
+      
expectedLogs.verifyWarn(String.format(SnsIO.Write.SnsWriterFn.RETRY_ATTEMPT_LOG,
 3));
+      throw e.getCause();
+    }
+    fail("Pipeline is expected to fail because we were unable to write to 
SNS.");
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137588)
    Time Spent: 4h 40m  (was: 4.5h)

> Create SNS IO 
> --------------
>
>                 Key: BEAM-4823
>                 URL: https://issues.apache.org/jira/browse/BEAM-4823
>             Project: Beam
>          Issue Type: Task
>          Components: sdk-java-core
>    Affects Versions: Not applicable
>            Reporter: Ankit Jhalaria
>            Assignee: Ankit Jhalaria
>            Priority: Minor
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to