boyuanzz commented on a change in pull request #13567:
URL: https://github.com/apache/beam/pull/13567#discussion_r556758148



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
##########
@@ -163,28 +161,6 @@ abstract Builder setBacklogReaderSupplier(
     abstract Builder setOffsetReaderSupplier(
         SerializableSupplier<InitialOffsetReader> offsetReaderSupplier);
 
-    // Used for implementing build();
-    abstract SubscriptionPath subscriptionPath();
-
-    abstract Set<Partition> partitions();
-
-    abstract SubscriberOptions autoBuild();
-
-    @SuppressWarnings("CheckReturnValue")
-    public SubscriberOptions build() throws ApiException {
-      if (!partitions().isEmpty()) {
-        return autoBuild();
-      }
-
-      if (partitions().isEmpty()) {
-        int partitionCount = 
PartitionLookupUtils.numPartitions(subscriptionPath());
-        ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
-        for (int i = 0; i < partitionCount; i++) {
-          partitions.add(Partition.of(i));
-        }
-        setPartitions(partitions.build());
-      }
-      return autoBuild();
-    }
+    public abstract SubscriberOptions build();

Review comment:
       You may want to update the javadoc of `public abstract Set<Partition> 
partitions();`. Based on your code, whether the `partitions` is empty will lead 
to different behavior.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionLoader extends PTransform<PBegin, 
PCollection<SubscriptionPartition>> {
+  private final TopicPath topic;
+  private final SubscriptionPath subscription;
+  private final SerializableFunction<TopicPath, Integer> getPartitionCount;
+  private final Duration pollDuration;
+  private final boolean terminate;
+
+  SubscriptionPartitionLoader(TopicPath topic, SubscriptionPath subscription) {
+    this(
+        topic,
+        subscription,
+        PartitionLookupUtils::numPartitions,
+        Duration.standardMinutes(1),
+        false);
+  }
+
+  @VisibleForTesting
+  SubscriptionPartitionLoader(
+      TopicPath topic,
+      SubscriptionPath subscription,
+      SerializableFunction<TopicPath, Integer> getPartitionCount,
+      Duration pollDuration,
+      boolean terminate) {
+    this.topic = topic;
+    this.subscription = subscription;
+    this.getPartitionCount = getPartitionCount;
+    this.pollDuration = pollDuration;
+    this.terminate = terminate;
+  }
+
+  @Override
+  public PCollection<SubscriptionPartition> expand(PBegin input) {
+    PCollection<TopicPath> start = 
input.apply(Create.of(ImmutableList.of(topic)));
+    PCollection<KV<TopicPath, Partition>> partitions =
+        start.apply(
+            Watch.growthOf(
+                    new PollFn<TopicPath, Partition>() {
+                      @Override
+                      public PollResult<Partition> apply(TopicPath element, 
Context c) {
+                        checkArgument(element.equals(topic));
+                        int partitionCount = getPartitionCount.apply(element);
+                        List<Partition> partitions =
+                            IntStream.range(0, partitionCount)
+                                .mapToObj(Partition::of)
+                                .collect(Collectors.toList());
+                        return PollResult.incomplete(Instant.EPOCH, 
partitions);
+                      }
+                    })
+                .withPollInterval(pollDuration)
+                .withTerminationPerInput(
+                    terminate
+                        ? 
Watch.Growth.afterTotalOf(pollDuration.multipliedBy(10))

Review comment:
       Even though the `terminate` will not be `true` in this PR, I'm wondering 
how is `pollDuration.multipliedBy(10)` computed?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionLoader extends PTransform<PBegin, 
PCollection<SubscriptionPartition>> {

Review comment:
       `pollDuration` and `terminate` should be configurable from 
`PubSubLiteIO` with provided default values. What do you think?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionLoader extends PTransform<PBegin, 
PCollection<SubscriptionPartition>> {

Review comment:
       And for `terminate`, if you decide to use `Watch.Growth.afterTotalOf` as 
the termination condition, it would be better to have the time duration 
directly instead of a boolean. What do you think?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.PartitionLookupUtils;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class SubscriptionPartitionLoader extends PTransform<PBegin, 
PCollection<SubscriptionPartition>> {
+  private final TopicPath topic;
+  private final SubscriptionPath subscription;
+  private final SerializableFunction<TopicPath, Integer> getPartitionCount;
+  private final Duration pollDuration;
+  private final boolean terminate;
+
+  SubscriptionPartitionLoader(TopicPath topic, SubscriptionPath subscription) {
+    this(
+        topic,
+        subscription,
+        PartitionLookupUtils::numPartitions,
+        Duration.standardMinutes(1),
+        false);
+  }
+
+  @VisibleForTesting
+  SubscriptionPartitionLoader(
+      TopicPath topic,
+      SubscriptionPath subscription,
+      SerializableFunction<TopicPath, Integer> getPartitionCount,
+      Duration pollDuration,
+      boolean terminate) {
+    this.topic = topic;
+    this.subscription = subscription;
+    this.getPartitionCount = getPartitionCount;
+    this.pollDuration = pollDuration;
+    this.terminate = terminate;
+  }
+
+  @Override
+  public PCollection<SubscriptionPartition> expand(PBegin input) {
+    PCollection<TopicPath> start = 
input.apply(Create.of(ImmutableList.of(topic)));
+    PCollection<KV<TopicPath, Partition>> partitions =
+        start.apply(
+            Watch.growthOf(
+                    new PollFn<TopicPath, Partition>() {
+                      @Override
+                      public PollResult<Partition> apply(TopicPath element, 
Context c) {
+                        checkArgument(element.equals(topic));
+                        int partitionCount = getPartitionCount.apply(element);
+                        List<Partition> partitions =
+                            IntStream.range(0, partitionCount)
+                                .mapToObj(Partition::of)
+                                .collect(Collectors.toList());
+                        return PollResult.incomplete(Instant.EPOCH, 
partitions);

Review comment:
       Is it intended to always have the watermark as `Instant.EPOCH`? It will 
hold back the system watermark unnecessary and the worst case is that 
downstream operations will have to wait for this watch transform to complete to 
process.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to