dpcollins-google commented on code in PR #22612:
URL: https://github.com/apache/beam/pull/22612#discussion_r941383287
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java:
##########
@@ -134,19 +160,28 @@ private TopicPath getTopicPath() {
}
}
- @Override
- public PCollection<SequencedMessage> expand(PBegin input) {
- PCollection<SubscriptionPartition> subscriptionPartitions;
- subscriptionPartitions =
+ @SuppressWarnings("unused")
+ private PCollection<SequencedMessage> expandSdf(PBegin input) {
+ PCollection<SubscriptionPartition> subscriptionPartitions =
input.apply(new SubscriptionPartitionLoader(getTopicPath(),
options.subscriptionPath()));
-
return subscriptionPartitions.apply(
ParDo.of(
new PerSubscriptionPartitionSdf(
- new ManagedBacklogReaderFactoryImpl(this::newBacklogReader),
+ new ManagedFactoryImpl<>(this::newBacklogReader),
+ new ManagedFactoryImpl<>(this::newCommitter),
this::newInitialOffsetReader,
this::newRestrictionTracker,
- this::newPartitionProcessor,
- this::newCommitter)));
+ this::newPartitionProcessor)));
+ }
+
+ private PCollection<SequencedMessage> expandSource(PBegin input) {
+ return input.apply(
+ Read.from(
+ new UnboundedSourceImpl(options, this::newBufferedSubscriber,
this::newBacklogReader)));
+ }
+
+ @Override
+ public PCollection<SequencedMessage> expand(PBegin input) {
+ return expandSource(input);
Review Comment:
This will be deferred to a child CL which will override on runnerv2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]