[ 
https://issues.apache.org/jira/browse/BEAM-2660?focusedWorklogId=133147&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133147
 ]

ASF GitHub Bot logged work on BEAM-2660:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Aug/18 18:41
            Start Date: 09/Aug/18 18:41
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #3619: [BEAM-2660] Set 
PubsubIO batch size using builder
URL: https://github.com/apache/beam/pull/3619
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5f4027adce5..e39b44f95bd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Message;
 import java.io.IOException;
@@ -31,6 +32,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import javax.naming.SizeLimitExceededException;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -732,9 +734,20 @@ private PubsubIO() {}
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 10 * 1024 * 
1024;
+    private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+
     @Nullable
     abstract ValueProvider<PubsubTopic> getTopicProvider();
 
+    /** the batch size for bulk submissions to pubsub. */
+    @Nullable
+    abstract Integer getMaxBatchSize();
+
+    /** the maximum batch size, by bytes. */
+    @Nullable
+    abstract Integer getMaxBatchBytesSize();
+
     /** The name of the message attribute to publish message timestamps in. */
     @Nullable
     abstract String getTimestampAttribute();
@@ -753,6 +766,10 @@ private PubsubIO() {}
     abstract static class Builder<T> {
       abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> 
topicProvider);
 
+      abstract Builder<T> setMaxBatchSize(Integer batchSize);
+
+      abstract Builder<T> setMaxBatchBytesSize(Integer maxBatchBytesSize);
+
       abstract Builder<T> setTimestampAttribute(String timestampAttribute);
 
       abstract Builder<T> setIdAttribute(String idAttribute);
@@ -779,6 +796,29 @@ private PubsubIO() {}
           .build();
     }
 
+    /**
+     * Writes to Pub/Sub are batched to efficiently send data. The value of 
the attribute will be a
+     * number representing the number of Pub/Sub messages to queue before 
sending off the bulk
+     * request. For example, if given 1000 the write sink will wait until 1000 
messages have been
+     * received, or the pipeline has finished, whichever is first.
+     *
+     * <p>Pub/Sub has a limitation of 10mb per individual request/batch. This 
attribute was
+     * requested dynamic to allow larger Pub/Sub messages to be sent using 
this source. Thus
+     * allowing customizable batches and control of number of events before 
the 10mb size limit is
+     * hit.
+     */
+    public Write<T> withMaxBatchSize(int batchSize) {
+      return toBuilder().setMaxBatchSize(batchSize).build();
+    }
+
+    /**
+     * Writes to Pub/Sub are limited by 10mb in general. This attribute 
controls the maximum allowed
+     * bytes to be sent to Pub/Sub in a single batched message.
+     */
+    public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
+      return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
+    }
+
     /**
      * Writes to Pub/Sub and adds each record's timestamp to the published 
messages in an attribute
      * with the specified name. The value of the attribute will be a number 
representing the number
@@ -819,9 +859,15 @@ public PDone expand(PCollection<T> input) {
       if (getTopicProvider() == null) {
         throw new IllegalStateException("need to set the topic of a 
PubsubIO.Write transform");
       }
+
       switch (input.isBounded()) {
         case BOUNDED:
-          input.apply(ParDo.of(new PubsubBoundedWriter()));
+          input.apply(
+              ParDo.of(
+                  new PubsubBoundedWriter(
+                      MoreObjects.firstNonNull(getMaxBatchSize(), 
MAX_PUBLISH_BATCH_SIZE),
+                      MoreObjects.firstNonNull(
+                          getMaxBatchBytesSize(), 
MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
           return PDone.in(input.getPipeline());
         case UNBOUNDED:
           return input
@@ -832,7 +878,12 @@ public PDone expand(PCollection<T> input) {
                       NestedValueProvider.of(getTopicProvider(), new 
TopicPathTranslator()),
                       getTimestampAttribute(),
                       getIdAttribute(),
-                      100 /* numShards */));
+                      100 /* numShards */,
+                      MoreObjects.firstNonNull(
+                          getMaxBatchSize(), 
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
+                      MoreObjects.firstNonNull(
+                          getMaxBatchBytesSize(),
+                          PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
       }
       throw new RuntimeException(); // cases are exhaustive.
     }
@@ -850,14 +901,27 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
      * <p>Public so can be suppressed by runners.
      */
     public class PubsubBoundedWriter extends DoFn<T, Void> {
-
-      private static final int MAX_PUBLISH_BATCH_SIZE = 100;
       private transient List<OutgoingMessage> output;
       private transient PubsubClient pubsubClient;
+      private transient int currentOutputBytes;
+
+      private int maxPublishBatchByteSize;
+      private int maxPublishBatchSize;
+
+      PubsubBoundedWriter(int maxPublishBatchSize, int 
maxPublishBatchByteSize) {
+        this.maxPublishBatchSize = maxPublishBatchSize;
+        this.maxPublishBatchByteSize = maxPublishBatchByteSize;
+      }
+
+      PubsubBoundedWriter() {
+        this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
+      }
 
       @StartBundle
       public void startBundle(StartBundleContext c) throws IOException {
         this.output = new ArrayList<>();
+        this.currentOutputBytes = 0;
+
         // NOTE: idAttribute is ignored.
         this.pubsubClient =
             FACTORY.newClient(
@@ -865,17 +929,29 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       }
 
       @ProcessElement
-      public void processElement(ProcessContext c) throws IOException {
+      public void processElement(ProcessContext c) throws IOException, 
SizeLimitExceededException {
         byte[] payload;
         PubsubMessage message = getFormatFn().apply(c.element());
         payload = message.getPayload();
         Map<String, String> attributes = message.getAttributeMap();
-        // NOTE: The record id is always null.
-        output.add(new OutgoingMessage(payload, attributes, 
c.timestamp().getMillis(), null));
 
-        if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+        if (payload.length > maxPublishBatchByteSize) {
+          String msg =
+              String.format(
+                  "Pub/Sub message size (%d) exceeded maximum batch size (%d)",
+                  payload.length, maxPublishBatchByteSize);
+          throw new SizeLimitExceededException(msg);
+        }
+
+        // Checking before adding the message stops us from violating the max 
bytes
+        if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
+            || (output.size() >= maxPublishBatchSize)) {
           publish();
         }
+
+        // NOTE: The record id is always null.
+        output.add(new OutgoingMessage(payload, attributes, 
c.timestamp().getMillis(), null));
+        currentOutputBytes += payload.length;
       }
 
       @FinishBundle
@@ -884,6 +960,7 @@ public void finishBundle() throws IOException {
           publish();
         }
         output = null;
+        currentOutputBytes = 0;
         pubsubClient.close();
         pubsubClient = null;
       }
@@ -895,6 +972,7 @@ private void publish() throws IOException {
                 PubsubClient.topicPathFromName(topic.project, topic.topic), 
output);
         checkState(n == output.size());
         output.clear();
+        currentOutputBytes = 0;
       }
 
       @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index e9c8e73b7f9..118b9317a2c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -84,10 +84,10 @@
  */
 public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>, PDone> {
   /** Default maximum number of messages per publish. */
-  private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+  static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
 
   /** Default maximum size of a publish batch, in bytes. */
-  private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+  static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
 
   /** Default longest delay between receiving a message and pushing it to 
Pubsub. */
   private static final Duration DEFAULT_MAX_LATENCY = 
Duration.standardSeconds(2);
@@ -367,6 +367,25 @@ public PubsubUnboundedSink(
         RecordIdMethod.RANDOM);
   }
 
+  public PubsubUnboundedSink(
+      PubsubClientFactory pubsubFactory,
+      ValueProvider<TopicPath> topic,
+      String timestampAttribute,
+      String idAttribute,
+      int numShards,
+      int publishBatchSize,
+      int publishBatchBytes) {
+    this(
+        pubsubFactory,
+        topic,
+        timestampAttribute,
+        idAttribute,
+        numShards,
+        publishBatchSize,
+        publishBatchBytes,
+        DEFAULT_MAX_LATENCY,
+        RecordIdMethod.RANDOM);
+  }
   /** Get the topic being written to. */
   public TopicPath getTopic() {
     return topic.get();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 133147)
    Time Spent: 4h  (was: 3h 50m)

> Set PubsubIO batch size using builder
> -------------------------------------
>
>                 Key: BEAM-2660
>                 URL: https://issues.apache.org/jira/browse/BEAM-2660
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Carl McGraw
>            Assignee: Chamikara Jayalath
>            Priority: Major
>              Labels: gcp, java, pubsub, sdk
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> PubsubIO doesn't allow users to set the publish batch size. Instead the value 
> is hard coded in both the BoundedPubsubWriter and the UnboundedPubsubSink. 
> google's pub/sub is bound to a maximum of 10mb per request size. My company 
> has run into problems with events that are individually smaller than 1mb, but 
> when batched in the 100 or 2000 default batch sizes causes pubsub to fail to 
> send the event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to