[ 
https://issues.apache.org/jira/browse/BEAM-8542?focusedWorklogId=343082&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343082
 ]

ASF GitHub Bot logged work on BEAM-8542:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Nov/19 02:18
            Start Date: 14/Nov/19 02:18
    Worklog Time Spent: 10m 
      Work Description: cmachgodaddy commented on pull request #10078: 
[BEAM-8542] Change write to async in AWS SNS IO & remove retry logic
URL: https://github.com/apache/beam/pull/10078#discussion_r346087821
 
 

 ##########
 File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
 ##########
 @@ -62,310 +58,272 @@
  * PCollection<String> data = ...;
  *
  * data.apply(SnsIO.<String>write()
- *     .withPublishRequestFn(m -> 
PublishRequest.builder().topicArn("topicArn").message(m).build())
- *     .withTopicArn("topicArn")
- *     .withRetryConfiguration(
- *        SnsIO.RetryConfiguration.create(
- *          4, org.joda.time.Duration.standardSeconds(10)))
- *     .withSnsClientProvider(new 
BasicSnsClientProvider(awsCredentialsProvider, region));
+ *             .withElementCoder(StringUtf8Coder.of())
+ *             .withPublishRequestFn(createPublishRequestFn())
+ *             .withSnsClientProvider(new 
BasicSnsClientProvider(awsCredentialsProvider, region));
+ *
  * }</pre>
  *
  * <p>As a client, you need to provide at least the following things:
  *
  * <ul>
- *   <li>SNS topic arn you're going to publish to
- *   <li>Retry Configuration
- *   <li>AwsCredentialsProvider, which you can pass on to 
BasicSnsClientProvider
+ *   <li>Coder for element T.
  *   <li>publishRequestFn, a function to convert your message into 
PublishRequest
+ *   <li>SnsClientProvider, a provider to create an async client.
  * </ul>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public final class SnsIO {
-
-  // Write data tp SNS
-  public static <T> Write<T> write() {
-    return new AutoValue_SnsIO_Write.Builder().build();
+final class SnsIO {
+  // Write data to SNS
+  static <T> Write<T> write() {
+    return new 
AutoValue_SnsIO_Write.Builder<T>().setThrowOnError(false).build();
   }
 
-  /**
-   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to SNS. A retry
-   * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
-   * any of the following exceptions:
-   *
-   * <ul>
-   *   <li>{@link IOException}
-   * </ul>
-   */
-  @AutoValue
-  public abstract static class RetryConfiguration implements Serializable {
-    @VisibleForTesting
-    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
-
-    abstract int getMaxAttempts();
-
-    abstract Duration getMaxDuration();
-
-    abstract RetryPredicate getRetryPredicate();
-
-    abstract Builder builder();
-
-    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
-      checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
-      checkArgument(
-          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
-          "maxDuration should be greater than 0");
-      return new AutoValue_SnsIO_RetryConfiguration.Builder()
-          .setMaxAttempts(maxAttempts)
-          .setMaxDuration(maxDuration)
-          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
-          .build();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setMaxAttempts(int maxAttempts);
-
-      abstract Builder setMaxDuration(Duration maxDuration);
-
-      abstract Builder setRetryPredicate(RetryPredicate retryPredicate);
-
-      abstract RetryConfiguration build();
-    }
-
-    /**
-     * An interface used to control if we retry the SNS Publish call when a 
{@link Throwable}
-     * occurs. If {@link RetryPredicate#test(Object)} returns true, {@link 
Write} tries to resend
-     * the requests to the Solr server if the {@link RetryConfiguration} 
permits it.
-     */
-    @FunctionalInterface
-    interface RetryPredicate extends Predicate<Throwable>, Serializable {}
-
-    private static class DefaultRetryPredicate implements RetryPredicate {
-      private static final ImmutableSet<Integer> ELIGIBLE_CODES =
-          ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
-
-      @Override
-      public boolean test(Throwable throwable) {
-        return (throwable instanceof IOException
-            || (throwable instanceof InternalErrorException)
-            || (throwable instanceof InternalErrorException
-                && ELIGIBLE_CODES.contains(((InternalErrorException) 
throwable).statusCode())));
-      }
-    }
-  }
-
-  /** Implementation of {@link #write}. */
+  /** Implementation of {@link #write()}. */
   @AutoValue
   public abstract static class Write<T>
-      extends PTransform<PCollection<T>, PCollection<PublishResponse>> {
-    @Nullable
-    abstract String getTopicArn();
+      extends PTransform<PCollection<T>, PCollection<WrappedSnsResponse<T>>> {
 
     @Nullable
-    abstract SerializableFunction<T, PublishRequest> getPublishRequestFn();
+    abstract SnsClientProvider getSnsClientProvider();
+
+    /** Flag to indicate if io should throw an error on exception. */
+    abstract boolean getThrowOnError();
 
+    /** Coder for element T. */
     @Nullable
-    abstract SnsClientProvider getSnsClientProvider();
+    abstract Coder<T> getElementCoder();
 
+    /** SerializableFunction to create PublishRequest. */
     @Nullable
-    abstract RetryConfiguration getRetryConfiguration();
+    abstract SerializableFunction<T, PublishRequest> getPublishRequestFn();
 
     abstract Builder<T> builder();
 
     @AutoValue.Builder
     abstract static class Builder<T> {
+      abstract Builder<T> setSnsClientProvider(SnsClientProvider 
clientProvider);
 
-      abstract Builder<T> setTopicArn(String topicArn);
+      abstract Builder<T> setThrowOnError(boolean throwOnError);
+
+      abstract Builder<T> setElementCoder(Coder<T> elementCoder);
 
       abstract Builder<T> setPublishRequestFn(
           SerializableFunction<T, PublishRequest> publishRequestFn);
 
-      abstract Builder<T> setSnsClientProvider(SnsClientProvider 
snsClientProvider);
-
-      abstract Builder<T> setRetryConfiguration(RetryConfiguration 
retryConfiguration);
-
       abstract Write<T> build();
     }
 
     /**
-     * Specify the SNS topic which will be used for writing, this name is 
mandatory.
+     * Allows to specify a flag to indicate if io should throw an error on 
exception.
      *
-     * @param topicArn topicArn
+     * @param throwOnError flag.
      */
-    public Write<T> withTopicArn(String topicArn) {
-      return builder().setTopicArn(topicArn).build();
+    Write<T> withThrowOnError(boolean throwOnError) {
+      return builder().setThrowOnError(throwOnError).build();
     }
 
     /**
-     * Specify a function for converting a message into PublishRequest object, 
this function is
-     * mandatory.
+     * Specify a function for converting a message into PublishRequest object.
+     *
+     * @param elementCoder Coder
+     */
+    Write<T> withElementCoder(Coder<T> elementCoder) {
+      checkNotNull(elementCoder, "elementCoder cannot be null");
+      return builder().setElementCoder(elementCoder).build();
+    }
+
+    /**
+     * Specify a function for converting a message into PublishRequest object.
      *
      * @param publishRequestFn publishRequestFn
      */
-    public Write<T> withPublishRequestFn(SerializableFunction<T, 
PublishRequest> publishRequestFn) {
+    Write<T> withPublishRequestFn(SerializableFunction<T, PublishRequest> 
publishRequestFn) {
+      checkNotNull(publishRequestFn, "publishRequestFn cannot be null");
       return builder().setPublishRequestFn(publishRequestFn).build();
     }
 
     /**
      * Allows to specify custom {@link SnsClientProvider}. {@link 
SnsClientProvider} creates new
-     * {@link SnsClient} which is later used for writing to a SNS topic.
+     * {@link software.amazon.awssdk.services.sns.SnsAsyncClient} which is 
later used for writing to
+     * a SNS topic.
      */
-    public Write<T> withSnsClientProvider(SnsClientProvider 
awsClientsProvider) {
-      return builder().setSnsClientProvider(awsClientsProvider).build();
+    Write<T> withSnsClientProvider(SnsClientProvider snsClientsProvider) {
+      checkNotNull(snsClientsProvider, "snsClientsProvider cannot be null");
+      return builder().setSnsClientProvider(snsClientsProvider).build();
     }
 
     /**
-     * Specify {@link AwsCredentialsProvider} and region to be used to write 
to SNS. If you need
-     * more sophisticated credential protocol, then you should look at {@link
+     * Specify credential details and region to be used to write to SNS. If 
you need more
+     * sophisticated credential protocol, then you should look at {@link
      * Write#withSnsClientProvider(SnsClientProvider)}.
      */
-    public Write<T> withSnsClientProvider(
-        AwsCredentialsProvider credentialsProvider, String region) {
+    Write<T> withSnsClientProvider(AwsCredentialsProvider credentialsProvider, 
String region) {
+      checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
+      checkNotNull(region, "region cannot be null");
       return withSnsClientProvider(credentialsProvider, region, null);
     }
 
     /**
-     * Specify {@link AwsCredentialsProvider} and region to be used to write 
to SNS. If you need
-     * more sophisticated credential protocol, then you should look at {@link
+     * Specify credential details and region to be used to write to SNS. If 
you need more
+     * sophisticated credential protocol, then you should look at {@link
      * Write#withSnsClientProvider(SnsClientProvider)}.
      *
-     * <p>The {@code serviceEndpoint} sets an alternative service host. This 
is useful to execute
-     * the tests with Kinesis service emulator.
+     * <p>The {@code serviceEndpoint} sets an alternative service host.
      */
-    public Write<T> withSnsClientProvider(
+    Write<T> withSnsClientProvider(
         AwsCredentialsProvider credentialsProvider, String region, URI 
serviceEndpoint) {
+      checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
+      checkNotNull(region, "region cannot be null");
       return withSnsClientProvider(
           new BasicSnsClientProvider(credentialsProvider, region, 
serviceEndpoint));
     }
 
-    /**
-     * Provides configuration to retry a failed request to publish a message 
to SNS. Users should
-     * consider that retrying might compound the underlying problem which 
caused the initial
-     * failure. Users should also be aware that once retrying is exhausted the 
error is surfaced to
-     * the runner which <em>may</em> then opt to retry the current partition 
in entirety or abort if
-     * the max number of retries of the runner is completed. Retrying uses an 
exponential backoff
-     * algorithm, with minimum backoff of 5 seconds and then surfacing the 
error once the maximum
-     * number of retries or maximum configuration duration is exceeded.
-     *
-     * <p>Example use:
-     *
-     * <pre>{@code
-     * SnsIO.write()
-     *   .withRetryConfiguration(SnsIO.RetryConfiguration.create(5, 
Duration.standardMinutes(1))
-     *   ...
-     * }</pre>
-     *
-     * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
-     */
-    public Write<T> withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
-      checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
-      return builder().setRetryConfiguration(retryConfiguration).build();
-    }
-
-    private static boolean isTopicExists(SnsClient client, String topicArn) {
-      try {
-        GetTopicAttributesRequest getTopicAttributesRequest =
-            GetTopicAttributesRequest.builder().topicArn(topicArn).build();
-        GetTopicAttributesResponse topicAttributesResponse =
-            client.getTopicAttributes(getTopicAttributesRequest);
-        return topicAttributesResponse != null
-            && topicAttributesResponse.sdkHttpResponse().statusCode() == 200;
-      } catch (Exception e) {
-        throw e;
-      }
-    }
-
     @Override
-    public PCollection<PublishResponse> expand(PCollection<T> input) {
-      checkArgument(getTopicArn() != null, "withTopicArn() is required");
-      checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() is 
required");
-      checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() 
is required");
-      checkArgument(
-          isTopicExists(getSnsClientProvider().getSnsClient(), getTopicArn()),
-          "Topic arn %s does not exist",
-          getTopicArn());
-
-      return input.apply(ParDo.of(new SnsWriterFn<>(this)));
+    public PCollection<WrappedSnsResponse<T>> expand(PCollection<T> input) {
+      checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() 
needs to called");
+      checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() 
needs to called");
+      checkArgument(getElementCoder() != null, "withElementCoder() needs to 
called");
+
+      return input
+          .apply(ParDo.of(new SnsAsyncWriterFn<>(this)))
+          .setCoder(WrappedSnsResponseCoder.of(getElementCoder()));
     }
 
-    static class SnsWriterFn<T> extends DoFn<T, PublishResponse> {
-      @VisibleForTesting
-      static final String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry 
attempt[%d]";
+    private static class SnsAsyncWriterFn<T> extends DoFn<T, 
WrappedSnsResponse<T>> {
+      private static final Logger LOG = 
LoggerFactory.getLogger(SnsAsyncWriterFn.class);
+      private static final String ERROR_MESSAGE_DELIMITER = " ";
 
-      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
-      private transient FluentBackoff retryBackoff; // defaults to no retries
-      private static final Logger LOG = 
LoggerFactory.getLogger(SnsWriterFn.class);
-      private static final Counter SNS_WRITE_FAILURES =
-          Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
+      private final Write<T> spec;
+      private transient SnsAsyncClient client;
+      private final ConcurrentLinkedQueue<SnsWriteException> failures;
+      private final ConcurrentLinkedQueue<KV<WrappedSnsResponse<T>, 
WindowedValue<T>>> results;
 
-      private final Write spec;
-      private transient SnsClient producer;
-
-      SnsWriterFn(Write spec) {
+      SnsAsyncWriterFn(Write<T> spec) {
         this.spec = spec;
+        this.failures = new ConcurrentLinkedQueue<>();
+        this.results = new ConcurrentLinkedQueue<>();
       }
 
       @Setup
-      public void setup() throws Exception {
-        // Initialize SnsPublisher
-        producer = spec.getSnsClientProvider().getSnsClient();
-
-        retryBackoff =
-            FluentBackoff.DEFAULT
-                .withMaxRetries(0) // default to no retrying
-                .withInitialBackoff(RETRY_INITIAL_BACKOFF);
-        if (spec.getRetryConfiguration() != null) {
-          retryBackoff =
-              retryBackoff
-                  
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
-                  
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
-        }
+      public void setup() {
+        this.client = spec.getSnsClientProvider().getSnsAsyncClient();
       }
 
+      @SuppressWarnings("FutureReturnValueIgnored")
       @ProcessElement
-      public void processElement(ProcessContext context) throws Exception {
-        PublishRequest request =
-            (PublishRequest) 
spec.getPublishRequestFn().apply(context.element());
-        Sleeper sleeper = Sleeper.DEFAULT;
-        BackOff backoff = retryBackoff.backoff();
-        int attempt = 0;
-        while (true) {
-          attempt++;
-          try {
-            PublishResponse pr = producer.publish(request);
-            context.output(pr);
-            break;
-          } catch (Exception ex) {
-            // Fail right away if there is no retry configuration
-            if (spec.getRetryConfiguration() == null
-                || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) 
{
-              SNS_WRITE_FAILURES.inc();
-              LOG.info("Unable to publish message {} due to {} ", 
request.message(), ex);
-              throw new IOException("Error writing to SNS (no attempt made to 
retry)", ex);
-            }
+      public void processElement(ProcessContext context, BoundedWindow 
elementWindow)
+          throws IOException {
+        T element = context.element();
+        Instant timestamp = context.timestamp();
+        PaneInfo paneInfo = context.pane();
+
+        WindowedValue<T> windowedValue =
+            WindowedValue.of(element, timestamp, elementWindow, paneInfo);
+
+        PublishRequest request = spec.getPublishRequestFn().apply(element);
+        
client.publish(request).whenComplete(whenCompleteAction(windowedValue));
 
-            if (!BackOffUtils.next(sleeper, backoff)) {
-              throw new IOException(
-                  String.format(
-                      "Error writing to SNS after %d attempt(s). No more 
attempts allowed",
-                      attempt),
-                  ex);
-            } else {
-              // Note: this used in test cases to verify behavior
-              LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
+        checkForFailures();
+      }
+
+      private SerializableBiConsumer<PublishResponse, ? super Throwable> 
whenCompleteAction(
+          final WindowedValue<T> windowedValue) {
+
+        T element = windowedValue.getValue();
+
+        return (response, exception) -> {
+          if (spec.getThrowOnError()) {
+            if (exception != null) {
+              failures.offer(
+                  new SnsWriteException(
+                      String.format("Error while publishing message: %s", 
element), exception));
+            } else if (!response.sdkHttpResponse().isSuccessful()) {
+              String customMessage =
+                  response.sdkHttpResponse().statusText().isPresent()
+                      ? String.format(
+                          "Reason for failure: %s", 
response.sdkHttpResponse().statusText().get())
+                      : "";
 
 Review comment:
   Giving a blank error message is not helpful for debugging?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 343082)
    Time Spent: 0.5h  (was: 20m)

> Add async write to AWS SNS IO & remove retry logic
> --------------------------------------------------
>
>                 Key: BEAM-8542
>                 URL: https://issues.apache.org/jira/browse/BEAM-8542
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws
>            Reporter: Ajo Thomas
>            Assignee: Ajo Thomas
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> - While working with SNS IO for one of my work-related projects, I found that 
> the IO uses synchronous publishes during writes. I had a simple mock pipeline 
> where I was reading from a kinesis stream and publishing it to SNS using 
> Beam's SNS IO. For comparison, I also had a lamdba which did the same using 
> asynchronous publishes but was about 5x faster. Changing the SNS IO to use 
> async publishes would improve publish latencies.
>  - SNS IO also has some retry logic which isn't required as SNS clients can 
> handle retries. The retry logic in the SNS client is user-configurable and 
> therefore, an explicit retry logic in SNS IO is not required
> I have a working version of the IO with these changes, will create a PR 
> linking this ticket to it once I get some feedback here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to