reuvenlax commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1170485518
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String
projectId, String topicName) {
public abstract static class OutgoingMessage implements Serializable {
/** Underlying Message. May not have publish timestamp set. */
- public abstract PubsubMessage message();
+ public abstract PubsubMessage getMessage();
/** Timestamp for element (ms since epoch). */
- public abstract long timestampMsSinceEpoch();
+ public abstract long getTimestampMsSinceEpoch();
/**
* If using an id attribute, the record id to associate with this record's
metadata so the
* receiver can reject duplicates. Otherwise {@literal null}.
*/
public abstract @Nullable String recordId();
+ public abstract @Nullable String topic();
+
public static OutgoingMessage of(
- PubsubMessage message, long timestampMsSinceEpoch, @Nullable String
recordId) {
- return new AutoValue_PubsubClient_OutgoingMessage(message,
timestampMsSinceEpoch, recordId);
+ PubsubMessage message,
+ long timestampMsSinceEpoch,
+ @Nullable String recordId,
+ String topic) {
+ return new AutoValue_PubsubClient_OutgoingMessage(
+ message, timestampMsSinceEpoch, recordId, topic);
}
public static OutgoingMessage of(
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
long timestampMsSinceEpoch,
- @Nullable String recordId) {
+ @Nullable String recordId,
+ String topic) {
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to
extract the topic from
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
* reviewers mentioned <a
*
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline
start time.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages =
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos =
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros =
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to
extract the topic from
+ * the record. For example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ * to((ValueInSingleWindow<Event> quote) -> {
+ * String country = quote.getCountry();
+ * return "projects/myproject/topics/events_" + country;
+ * });
+ * }</pre>
+ *
+ * Dynamic topics can also be specified by writing {@link PubsubMessage}
objects containing the
+ * topic. For example:
+ *
+ * <pre>{@code
+ * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {})
+ * .via(e -> new PubsubMessage(
+ * e.toByteString(),
Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }</pre>
+ *
+ * <h3>Custom timestamps</h3>
+ *
+ * All messages read from PubSub have a stable publish timestamp that is
independent of when the
+ * message is read from the PubSub topic. By default, the publish time is used
as the timestamp for
+ * all messages read and the watermark is based on that. If there is a
different logical timestamp
+ * to be used, that timestamp must be published in a PubSub attribute. The
attribute is specified
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws
IOException {
@ProcessElement
public void processElement(ProcessContext c) throws IOException,
SizeLimitExceededException {
PubsubMessage message = getFormatFn().apply(c.element());
- int messageSize = validateAndGetPubsubMessageSize(message);
- if (messageSize > maxPublishBatchByteSize) {
Review Comment:
sure, done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -242,7 +271,19 @@ private static class WriterFn extends DoFn<KV<Integer,
Iterable<OutgoingMessage>
/** BLOCKING Send {@code messages} as a batch to Pubsub. */
private void publishBatch(List<OutgoingMessage> messages, int bytes)
throws IOException {
- int n = pubsubClient.publish(topic.get(), messages);
+ int n = 0;
+ if (topic != null) {
+ n = pubsubClient.publish(topic.get(), messages);
+ } else {
+ Map<TopicPath, List<OutgoingMessage>> messagesPerTopic =
Maps.newHashMap();
+ for (OutgoingMessage message : messages) {
+ TopicPath topicPath =
PubsubClient.topicPathFromPath(message.topic());
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -495,6 +604,10 @@ static class PubsubSink extends
PTransform<PCollection<byte[]>, PDone> {
this.outer = outer;
}
+ boolean isDynamic() {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]