[
https://issues.apache.org/jira/browse/BEAM-2660?focusedWorklogId=133147&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133147
]
ASF GitHub Bot logged work on BEAM-2660:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Aug/18 18:41
Start Date: 09/Aug/18 18:41
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #3619: [BEAM-2660] Set
PubsubIO batch size using builder
URL: https://github.com/apache/beam/pull/3619
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 5f4027adce5..e39b44f95bd 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import java.io.IOException;
@@ -31,6 +32,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -732,9 +734,20 @@ private PubsubIO() {}
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
+ private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 10 * 1024 *
1024;
+ private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+
@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();
+ /** the batch size for bulk submissions to pubsub. */
+ @Nullable
+ abstract Integer getMaxBatchSize();
+
+ /** the maximum batch size, by bytes. */
+ @Nullable
+ abstract Integer getMaxBatchBytesSize();
+
/** The name of the message attribute to publish message timestamps in. */
@Nullable
abstract String getTimestampAttribute();
@@ -753,6 +766,10 @@ private PubsubIO() {}
abstract static class Builder<T> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic>
topicProvider);
+ abstract Builder<T> setMaxBatchSize(Integer batchSize);
+
+ abstract Builder<T> setMaxBatchBytesSize(Integer maxBatchBytesSize);
+
abstract Builder<T> setTimestampAttribute(String timestampAttribute);
abstract Builder<T> setIdAttribute(String idAttribute);
@@ -779,6 +796,29 @@ private PubsubIO() {}
.build();
}
+ /**
+ * Writes to Pub/Sub are batched to efficiently send data. The value of
the attribute will be a
+ * number representing the number of Pub/Sub messages to queue before
sending off the bulk
+ * request. For example, if given 1000 the write sink will wait until 1000
messages have been
+ * received, or the pipeline has finished, whichever is first.
+ *
+ * <p>Pub/Sub has a limitation of 10mb per individual request/batch. This
attribute was
+ * requested dynamic to allow larger Pub/Sub messages to be sent using
this source. Thus
+ * allowing customizable batches and control of number of events before
the 10mb size limit is
+ * hit.
+ */
+ public Write<T> withMaxBatchSize(int batchSize) {
+ return toBuilder().setMaxBatchSize(batchSize).build();
+ }
+
+ /**
+ * Writes to Pub/Sub are limited by 10mb in general. This attribute
controls the maximum allowed
+ * bytes to be sent to Pub/Sub in a single batched message.
+ */
+ public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
+ return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
+ }
+
/**
* Writes to Pub/Sub and adds each record's timestamp to the published
messages in an attribute
* with the specified name. The value of the attribute will be a number
representing the number
@@ -819,9 +859,15 @@ public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null) {
throw new IllegalStateException("need to set the topic of a
PubsubIO.Write transform");
}
+
switch (input.isBounded()) {
case BOUNDED:
- input.apply(ParDo.of(new PubsubBoundedWriter()));
+ input.apply(
+ ParDo.of(
+ new PubsubBoundedWriter(
+ MoreObjects.firstNonNull(getMaxBatchSize(),
MAX_PUBLISH_BATCH_SIZE),
+ MoreObjects.firstNonNull(
+ getMaxBatchBytesSize(),
MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
return PDone.in(input.getPipeline());
case UNBOUNDED:
return input
@@ -832,7 +878,12 @@ public PDone expand(PCollection<T> input) {
NestedValueProvider.of(getTopicProvider(), new
TopicPathTranslator()),
getTimestampAttribute(),
getIdAttribute(),
- 100 /* numShards */));
+ 100 /* numShards */,
+ MoreObjects.firstNonNull(
+ getMaxBatchSize(),
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
+ MoreObjects.firstNonNull(
+ getMaxBatchBytesSize(),
+ PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
}
throw new RuntimeException(); // cases are exhaustive.
}
@@ -850,14 +901,27 @@ public void populateDisplayData(DisplayData.Builder
builder) {
* <p>Public so can be suppressed by runners.
*/
public class PubsubBoundedWriter extends DoFn<T, Void> {
-
- private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
+ private transient int currentOutputBytes;
+
+ private int maxPublishBatchByteSize;
+ private int maxPublishBatchSize;
+
+ PubsubBoundedWriter(int maxPublishBatchSize, int
maxPublishBatchByteSize) {
+ this.maxPublishBatchSize = maxPublishBatchSize;
+ this.maxPublishBatchByteSize = maxPublishBatchByteSize;
+ }
+
+ PubsubBoundedWriter() {
+ this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
+ }
@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
this.output = new ArrayList<>();
+ this.currentOutputBytes = 0;
+
// NOTE: idAttribute is ignored.
this.pubsubClient =
FACTORY.newClient(
@@ -865,17 +929,29 @@ public void startBundle(StartBundleContext c) throws
IOException {
}
@ProcessElement
- public void processElement(ProcessContext c) throws IOException {
+ public void processElement(ProcessContext c) throws IOException,
SizeLimitExceededException {
byte[] payload;
PubsubMessage message = getFormatFn().apply(c.element());
payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();
- // NOTE: The record id is always null.
- output.add(new OutgoingMessage(payload, attributes,
c.timestamp().getMillis(), null));
- if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+ if (payload.length > maxPublishBatchByteSize) {
+ String msg =
+ String.format(
+ "Pub/Sub message size (%d) exceeded maximum batch size (%d)",
+ payload.length, maxPublishBatchByteSize);
+ throw new SizeLimitExceededException(msg);
+ }
+
+ // Checking before adding the message stops us from violating the max
bytes
+ if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
+ || (output.size() >= maxPublishBatchSize)) {
publish();
}
+
+ // NOTE: The record id is always null.
+ output.add(new OutgoingMessage(payload, attributes,
c.timestamp().getMillis(), null));
+ currentOutputBytes += payload.length;
}
@FinishBundle
@@ -884,6 +960,7 @@ public void finishBundle() throws IOException {
publish();
}
output = null;
+ currentOutputBytes = 0;
pubsubClient.close();
pubsubClient = null;
}
@@ -895,6 +972,7 @@ private void publish() throws IOException {
PubsubClient.topicPathFromName(topic.project, topic.topic),
output);
checkState(n == output.size());
output.clear();
+ currentOutputBytes = 0;
}
@Override
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index e9c8e73b7f9..118b9317a2c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -84,10 +84,10 @@
*/
public class PubsubUnboundedSink extends
PTransform<PCollection<PubsubMessage>, PDone> {
/** Default maximum number of messages per publish. */
- private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
+ static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
/** Default maximum size of a publish batch, in bytes. */
- private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
+ static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
/** Default longest delay between receiving a message and pushing it to
Pubsub. */
private static final Duration DEFAULT_MAX_LATENCY =
Duration.standardSeconds(2);
@@ -367,6 +367,25 @@ public PubsubUnboundedSink(
RecordIdMethod.RANDOM);
}
+ public PubsubUnboundedSink(
+ PubsubClientFactory pubsubFactory,
+ ValueProvider<TopicPath> topic,
+ String timestampAttribute,
+ String idAttribute,
+ int numShards,
+ int publishBatchSize,
+ int publishBatchBytes) {
+ this(
+ pubsubFactory,
+ topic,
+ timestampAttribute,
+ idAttribute,
+ numShards,
+ publishBatchSize,
+ publishBatchBytes,
+ DEFAULT_MAX_LATENCY,
+ RecordIdMethod.RANDOM);
+ }
/** Get the topic being written to. */
public TopicPath getTopic() {
return topic.get();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 133147)
Time Spent: 4h (was: 3h 50m)
> Set PubsubIO batch size using builder
> -------------------------------------
>
> Key: BEAM-2660
> URL: https://issues.apache.org/jira/browse/BEAM-2660
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Carl McGraw
> Assignee: Chamikara Jayalath
> Priority: Major
> Labels: gcp, java, pubsub, sdk
> Time Spent: 4h
> Remaining Estimate: 0h
>
> PubsubIO doesn't allow users to set the publish batch size. Instead the value
> is hard coded in both the BoundedPubsubWriter and the UnboundedPubsubSink.
> google's pub/sub is bound to a maximum of 10mb per request size. My company
> has run into problems with events that are individually smaller than 1mb, but
> when batched in the 100 or 2000 default batch sizes causes pubsub to fail to
> send the event.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)