pabloem commented on code in PR #25411:
URL: https://github.com/apache/beam/pull/25411#discussion_r1105229529


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java:
##########
@@ -47,12 +57,34 @@ public GenerateInitialPartitionsAction(
    * The very first step of the pipeline when there are no partitions being 
streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, 
otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No 
partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on 
event time. This
+      // limits the ability to window on commit time of any data changes. It 
is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);

Review Comment:
   on the other hand, these are initial partitions so it perhaps does not 
matter too much what timestamps they bring (as long as individual change 
records have correct timestamps?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to