reuvenlax commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1171791073
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1263,45 +1292,52 @@ public Write<T> withPubsubRootUrl(String pubsubRootUrl)
{
@Override
public PDone expand(PCollection<T> input) {
- if (getTopicProvider() == null) {
- throw new IllegalStateException("need to set the topic of a
PubsubIO.Write transform");
+ if (getTopicProvider() == null && !getDynamicDestinations()) {
+ throw new IllegalStateException(
+ "need to set the topic of a PubsubIO.Write transform if not using "
+ + "dynamic topic destinations.");
}
+ SerializableFunction<ValueInSingleWindow<T>, PubsubIO.PubsubTopic>
topicFunction =
+ getTopicFunction();
+ if (topicFunction == null && getTopicProvider() != null) {
+ topicFunction = v -> getTopicProvider().get();
+ }
+ int maxMessageSize = PUBSUB_MESSAGE_MAX_TOTAL_SIZE;
+ if (input.isBounded() == PCollection.IsBounded.BOUNDED) {
+ maxMessageSize =
+ Math.min(
+ maxMessageSize,
+ MoreObjects.firstNonNull(
+ getMaxBatchBytesSize(),
MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT));
+ }
+ PCollection<PubsubMessage> pubsubMessages =
+ input.apply(
+ ParDo.of(new PreparePubsubWriteDoFn<>(getFormatFn(),
topicFunction, maxMessageSize)));
switch (input.isBounded()) {
case BOUNDED:
- input.apply(
+ pubsubMessages.apply(
Review Comment:
good point - fixed BoundedWriter
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -479,13 +523,78 @@ public ValueProvider<TopicPath> getTopicProvider() {
@Override
public PDone expand(PCollection<PubsubMessage> input) {
- return input
- .apply(
- "Output Serialized PubsubMessage Proto",
- MapElements.into(new TypeDescriptor<byte[]>() {})
- .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
- .setCoder(ByteArrayCoder.of())
- .apply(new PubsubSink(this));
+ if (topic != null) {
+ return input
+ .apply(
+ "Output Serialized PubsubMessage Proto",
+ MapElements.into(new TypeDescriptor<byte[]>() {})
+ .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+ .setCoder(ByteArrayCoder.of())
+ .apply(new PubsubSink(this));
+ } else {
+ // dynamic destinations.
+ return input
+ .apply(
+ "WithDynamicKeys",
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -268,9 +309,9 @@ public void startBundle(StartBundleContext c) throws
Exception {
public void processElement(ProcessContext c) throws Exception {
List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
int bytes = 0;
- for (OutgoingMessage message : c.element().getValue()) {
+ for (OutgoingMessage message : c.element()) {
if (!pubsubMessages.isEmpty()
- && bytes + message.message().getData().size() > publishBatchBytes)
{
+ && bytes + message.getMessage().getData().size() >
publishBatchBytes) {
Review Comment:
Added grouping by topic+shard
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]