[
https://issues.apache.org/jira/browse/BEAM-4823?focusedWorklogId=136570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136570
]
ASF GitHub Bot logged work on BEAM-4823:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Aug/18 15:32
Start Date: 21/Aug/18 15:32
Worklog Time Spent: 10m
Work Description: iemejia commented on a change in pull request #6202:
[BEAM-4823] - Adds a Sink to write to Amazon's SNS
URL: https://github.com/apache/beam/pull/6202#discussion_r211646489
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.sns;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.model.GetTopicAttributesResult;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform}s for writing to <a
href="https://aws.amazon.com/sns/">SNS</a>.
+ *
+ * <h3>Writing to SNS</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<PublishRequest> data = ...;
+ *
+ * data.apply(SnsIO.write()
+ * .withTopicName("topicName")
+ * .withMaxRetries(MAX_RETRIES) //eg. 2
+ * .withMaxDelay(MAX_DELAY) //eg. Duration.ofSeconds(2)
+ * .withRetryDelay(RETRY_DELAY) //eg. Duration.ofSeconds(1)
+ * .withAWSClientsProvider(new Provider(new AmazonSNSMock()))
+ * .withResultOutputTag(results));
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ * AwsClientsProvider
+ * <li>name of the SNS topic you're going to write to
+ * <li>the max number of retries to perform.
+ * <li>maximum total delay
+ * <li>delay between retries
+ * <li>need to specify AwsClientsProvider. You can pass on the default one
BasicSnsProvider
+ * <li>an output tag where you can get results. Example in SnsIOTest
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public final class SnsIO {
+
+ private static final int DEFAULT_MAX_RETRIES = 6;
+
+ //Write data tp SNS
+ public static Write write() {
+ return new
AutoValue_SnsIO_Write.Builder().setMaxRetries(DEFAULT_MAX_RETRIES).build();
+ }
+
+ /** Implementation of {@link #write}. */
+ @AutoValue
+ public abstract static class Write
+ extends PTransform<PCollection<PublishRequest>, PCollectionTuple> {
+ @Nullable
+ abstract String getTopicName();
+
+ @Nullable
+ abstract AwsClientsProvider getAWSClientsProvider();
+
+ @Nullable
+ abstract Duration getMaxDelay();
+
+ @Nonnull
+ abstract int getMaxRetries();
+
+ @Nullable
+ abstract Duration getRetryDelay();
+
+ @Nullable
+ abstract TupleTag<PublishResult> getResultOutputTag();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setTopicName(String topicName);
+
+ abstract Builder setAWSClientsProvider(AwsClientsProvider
clientProvider);
+
+ abstract Builder setMaxDelay(Duration maxDelay);
+
+ abstract Builder setMaxRetries(int maxRetries);
+
+ abstract Builder setRetryDelay(Duration retryDelay);
+
+ abstract Builder setResultOutputTag(TupleTag<PublishResult> results);
+
+ abstract Write build();
+ }
+
+ /** Specify the SNS topic which will be used for writing, this name is
mandatory. */
+ public Write withTopicName(String topicName) {
+ return builder().setTopicName(topicName).build();
+ }
+
+ /**
+ * Allows to specify custom {@link AwsClientsProvider}. {@link
AwsClientsProvider} creates new
+ * {@link AmazonSNS} which is later used for writing to a SNS topic.
+ */
+ public Write withAWSClientsProvider(AwsClientsProvider awsClientsProvider)
{
+ return builder().setAWSClientsProvider(awsClientsProvider).build();
+ }
+
+ /**
+ * Specify credential details and region to be used to write to SNS. If
you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * Write#withAWSClientsProvider(AwsClientsProvider)}.
+ */
+ public Write withAWSClientsProvider(String awsAccessKey, String
awsSecretKey, Regions region) {
+ return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
+ }
+
+ /**
+ * Specify credential details and region to be used to write to SNS. If
you need more
+ * sophisticated credential protocol, then you should look at {@link
+ * Write#withAWSClientsProvider(AwsClientsProvider)}.
+ *
+ * <p>The {@code serviceEndpoint} sets an alternative service host. This
is useful to execute
+ * the tests with Kinesis service emulator.
+ */
+ public Write withAWSClientsProvider(
+ String awsAccessKey, String awsSecretKey, Regions region, String
serviceEndpoint) {
+ return withAWSClientsProvider(
+ new BasicSnsProvider(awsAccessKey, awsSecretKey, region,
serviceEndpoint));
+ }
+
+ /** Max delay in seconds for retries. Mandatory field. */
+ public Write withMaxDelay(Duration maxDelay) {
+ return builder().setMaxDelay(maxDelay).build();
+ }
+
+ /** Max number of times to retry before writing to failures. Mandatory
field. */
+ public Write withMaxRetries(int maxRetries) {
+ return builder().setMaxRetries(maxRetries).build();
+ }
+
+ /** Initial delay in seconds for retries. Mandatory field. */
+ public Write withRetryDelay(Duration retryDelay) {
+ return builder().setRetryDelay(retryDelay).build();
+ }
+
+ /** Tuple tag to store results. Mandatory field. */
+ public Write withResultOutputTag(TupleTag<PublishResult> results) {
+ return builder().setResultOutputTag(results).build();
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<PublishRequest> input) {
+ checkArgument(getTopicName() != null, "withTopicName() is required");
+ final PCollectionTuple tuple =
+ input.apply(
+ ParDo.of(new SnsWriterFn(this))
+ .withOutputTags(getResultOutputTag(), TupleTagList.empty()));
+ return tuple;
+ }
+
+ private static class SnsWriterFn extends DoFn<PublishRequest,
PublishResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SnsWriterFn.class);
+ private static final Counter SNS_WRITE_FAILURES =
+ Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
+
+ private final SnsIO.Write spec;
+ private transient AmazonSNS producer;
+ private transient RetryPolicy retryPolicy;
+
+ public SnsWriterFn(SnsIO.Write spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+
+ //Initialize SnsPublisher
+ producer = spec.getAWSClientsProvider().createSnsPublisher();
+ retryPolicy =
Review comment:
Beam has already a [Backoff
facility](https://github.com/apache/beam/blob/20aff13d36b4ac7d9b9a29fd9f6fb3e80a84214a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java#L30)
so probably worth using that one to avoid the extra dependency. Please take a
look at the use of [Backoff in
SolrIO](https://github.com/apache/beam/blob/20aff13d36b4ac7d9b9a29fd9f6fb3e80a84214a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L607)
to see how it works, it seems like an easy rewrite. Notice that the default
case is no retry so probably worth doing the same here, and that the user can
parametrize the retry via a function like done [there
too](https://github.com/apache/beam/blob/20aff13d36b4ac7d9b9a29fd9f6fb3e80a84214a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L261)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 136570)
Time Spent: 2h 50m (was: 2h 40m)
> Create SNS IO
> --------------
>
> Key: BEAM-4823
> URL: https://issues.apache.org/jira/browse/BEAM-4823
> Project: Beam
> Issue Type: Task
> Components: sdk-java-core
> Affects Versions: Not applicable
> Reporter: Ankit Jhalaria
> Assignee: Ankit Jhalaria
> Priority: Minor
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)