[ 
https://issues.apache.org/jira/browse/BEAM-8542?focusedWorklogId=343597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343597
 ]

ASF GitHub Bot logged work on BEAM-8542:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Nov/19 18:00
            Start Date: 14/Nov/19 18:00
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev commented on pull request #10078: 
[BEAM-8542] Change write to async in AWS SNS IO & remove retry logic
URL: https://github.com/apache/beam/pull/10078#discussion_r346458429
 
 

 ##########
 File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
 ##########
 @@ -62,310 +58,272 @@
  * PCollection<String> data = ...;
  *
  * data.apply(SnsIO.<String>write()
- *     .withPublishRequestFn(m -> 
PublishRequest.builder().topicArn("topicArn").message(m).build())
- *     .withTopicArn("topicArn")
- *     .withRetryConfiguration(
- *        SnsIO.RetryConfiguration.create(
- *          4, org.joda.time.Duration.standardSeconds(10)))
- *     .withSnsClientProvider(new 
BasicSnsClientProvider(awsCredentialsProvider, region));
+ *             .withElementCoder(StringUtf8Coder.of())
+ *             .withPublishRequestFn(createPublishRequestFn())
+ *             .withSnsClientProvider(new 
BasicSnsClientProvider(awsCredentialsProvider, region));
+ *
  * }</pre>
  *
  * <p>As a client, you need to provide at least the following things:
  *
  * <ul>
- *   <li>SNS topic arn you're going to publish to
- *   <li>Retry Configuration
- *   <li>AwsCredentialsProvider, which you can pass on to 
BasicSnsClientProvider
+ *   <li>Coder for element T.
  *   <li>publishRequestFn, a function to convert your message into 
PublishRequest
+ *   <li>SnsClientProvider, a provider to create an async client.
  * </ul>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public final class SnsIO {
-
-  // Write data tp SNS
-  public static <T> Write<T> write() {
-    return new AutoValue_SnsIO_Write.Builder().build();
+final class SnsIO {
+  // Write data to SNS
+  static <T> Write<T> write() {
+    return new 
AutoValue_SnsIO_Write.Builder<T>().setThrowOnError(false).build();
   }
 
-  /**
-   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to SNS. A retry
-   * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
-   * any of the following exceptions:
-   *
-   * <ul>
-   *   <li>{@link IOException}
-   * </ul>
-   */
-  @AutoValue
-  public abstract static class RetryConfiguration implements Serializable {
-    @VisibleForTesting
-    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
-
-    abstract int getMaxAttempts();
-
-    abstract Duration getMaxDuration();
-
-    abstract RetryPredicate getRetryPredicate();
-
-    abstract Builder builder();
-
-    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
-      checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0");
-      checkArgument(
-          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
-          "maxDuration should be greater than 0");
-      return new AutoValue_SnsIO_RetryConfiguration.Builder()
-          .setMaxAttempts(maxAttempts)
-          .setMaxDuration(maxDuration)
-          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
-          .build();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setMaxAttempts(int maxAttempts);
-
-      abstract Builder setMaxDuration(Duration maxDuration);
-
-      abstract Builder setRetryPredicate(RetryPredicate retryPredicate);
-
-      abstract RetryConfiguration build();
-    }
-
-    /**
-     * An interface used to control if we retry the SNS Publish call when a 
{@link Throwable}
-     * occurs. If {@link RetryPredicate#test(Object)} returns true, {@link 
Write} tries to resend
-     * the requests to the Solr server if the {@link RetryConfiguration} 
permits it.
-     */
-    @FunctionalInterface
-    interface RetryPredicate extends Predicate<Throwable>, Serializable {}
-
-    private static class DefaultRetryPredicate implements RetryPredicate {
-      private static final ImmutableSet<Integer> ELIGIBLE_CODES =
-          ImmutableSet.of(HttpStatus.SC_SERVICE_UNAVAILABLE);
-
-      @Override
-      public boolean test(Throwable throwable) {
-        return (throwable instanceof IOException
-            || (throwable instanceof InternalErrorException)
-            || (throwable instanceof InternalErrorException
-                && ELIGIBLE_CODES.contains(((InternalErrorException) 
throwable).statusCode())));
-      }
-    }
-  }
-
-  /** Implementation of {@link #write}. */
+  /** Implementation of {@link #write()}. */
   @AutoValue
   public abstract static class Write<T>
-      extends PTransform<PCollection<T>, PCollection<PublishResponse>> {
-    @Nullable
-    abstract String getTopicArn();
+      extends PTransform<PCollection<T>, PCollection<WrappedSnsResponse<T>>> {
 
 Review comment:
   Why do we need to return `PCollection` and not just `PDone`?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 343597)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add async write to AWS SNS IO & remove retry logic
> --------------------------------------------------
>
>                 Key: BEAM-8542
>                 URL: https://issues.apache.org/jira/browse/BEAM-8542
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws
>            Reporter: Ajo Thomas
>            Assignee: Ajo Thomas
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> - While working with SNS IO for one of my work-related projects, I found that 
> the IO uses synchronous publishes during writes. I had a simple mock pipeline 
> where I was reading from a kinesis stream and publishing it to SNS using 
> Beam's SNS IO. For comparison, I also had a lamdba which did the same using 
> asynchronous publishes but was about 5x faster. Changing the SNS IO to use 
> async publishes would improve publish latencies.
>  - SNS IO also has some retry logic which isn't required as SNS clients can 
> handle retries. The retry logic in the SNS client is user-configurable and 
> therefore, an explicit retry logic in SNS IO is not required
> I have a working version of the IO with these changes, will create a PR 
> linking this ticket to it once I get some feedback here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to