This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by 
this push:
     new e743ebaf818 Implement generating initial list of partitions (#25411)
e743ebaf818 is described below

commit e743ebaf818039a01d810b96858c4270be05d638
Author: Tony Tang <[email protected]>
AuthorDate: Mon Feb 13 21:37:43 2023 -0500

    Implement generating initial list of partitions (#25411)
---
 .../changestreams/ChangeStreamMetrics.java         |  25 ++++
 .../action/DetectNewPartitionsAction.java          |  10 +-
 .../action/GenerateInitialPartitionsAction.java    |  42 ++++++-
 .../changestreams/dao/ChangeStreamDao.java         |  11 ++
 .../action/DetectNewPartitionsActionTest.java      | 127 +++++++++++++++++++++
 .../GenerateInitialPartitionsActionTest.java       | 119 +++++++++++++++++++
 6 files changed, 326 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2aaa6631ace..f8177adf873 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -18,8 +18,33 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
 
 import java.io.Serializable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 
 /** Class to aggregate metrics related functionality. */
 public class ChangeStreamMetrics implements Serializable {
   private static final long serialVersionUID = 7298901109362981596L;
+  // ------------------------
+  // Partition record metrics
+
+  /**
+   * Counter for the total number of partitions identified during the 
execution of the Connector.
+   */
+  public static final Counter LIST_PARTITIONS_COUNT =
+      Metrics.counter(
+          
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+          "list_partitions_count");
+
+  /**
+   * Increments the {@link
+   * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT}
 by
+   * 1 if the metric is enabled.
+   */
+  public void incListPartitionsCount() {
+    inc(LIST_PARTITIONS_COUNT);
+  }
+
+  private void inc(Counter counter) {
+    counter.inc();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 990d9b2fd01..932796d4ba8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -25,7 +25,8 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -93,11 +94,14 @@ public class DetectNewPartitionsAction {
   @VisibleForTesting
   public ProcessContinuation run(
       RestrictionTracker<OffsetRange, Long> tracker,
-      DoFn.OutputReceiver<PartitionRecord> receiver,
+      OutputReceiver<PartitionRecord> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      DoFn.BundleFinalizer bundleFinalizer,
+      BundleFinalizer bundleFinalizer,
       Timestamp startTime)
       throws Exception {
+    if (tracker.currentRestriction().getFrom() == 0L) {
+      return generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime);
+    }
 
     // Terminate if endTime <= watermark that means all partitions have read 
up to or beyond
     // watermark. We no longer need to manage splits and merges, we can 
terminate.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index be61c9a9e9b..164a679a26f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -18,20 +18,30 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
 import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class to generate first set of outputs for {@link
  * 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn}.
  */
-@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
 public class GenerateInitialPartitionsAction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GenerateInitialPartitionsAction.class);
+
   private final ChangeStreamMetrics metrics;
   private final ChangeStreamDao changeStreamDao;
   @Nullable private final Timestamp endTime;
@@ -47,12 +57,34 @@ public class GenerateInitialPartitionsAction {
    * The very first step of the pipeline when there are no partitions being 
streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, 
otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No 
partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on 
event time. This
+      // limits the ability to window on commit time of any data changes. It 
is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
+    }
+    return ProcessContinuation.resume();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 147b872dba0..c890193a168 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
 
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,4 +35,13 @@ public class ChangeStreamDao {
     this.dataClient = dataClient;
     this.tableId = tableId;
   }
+
+  /**
+   * Returns the result from GenerateInitialChangeStreamPartitions API.
+   *
+   * @return list of StreamPartition
+   */
+  public List<ByteStringRange> generateInitialChangeStreamPartitions() {
+    return 
dataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
new file mode 100644
index 00000000000..b3f86c9be10
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DetectNewPartitionsActionTest {
+  @ClassRule
+  public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = 
BigtableEmulatorRule.create();
+
+  @Mock private ChangeStreamMetrics metrics;
+  @Mock private GenerateInitialPartitionsAction 
generateInitialPartitionsAction;
+
+  private DetectNewPartitionsAction action;
+
+  @Mock private RestrictionTracker<OffsetRange, Long> tracker;
+  @Mock private OutputReceiver<PartitionRecord> receiver;
+  @Mock private BundleFinalizer bundleFinalizer;
+
+  private MetadataTableDao metadataTableDao;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private Timestamp startTime;
+  private Timestamp endTime;
+  private static BigtableDataClient dataClient;
+  private static BigtableTableAdminClient adminClient;
+
+  @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    BigtableTableAdminSettings adminSettings =
+        
BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+            .setProjectId("fake-project")
+            .setInstanceId("fake-instance")
+            .build();
+    adminClient = BigtableTableAdminClient.create(adminSettings);
+    BigtableDataSettings dataSettingsBuilder =
+        
BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+            .setProjectId("fake-project")
+            .setInstanceId("fake-instance")
+            .build();
+    dataClient = BigtableDataClient.create(dataSettingsBuilder);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    String changeStreamId = UniqueIdGenerator.generateRowKeyPrefix();
+    MetadataTableAdminDao metadataTableAdminDao =
+        new MetadataTableAdminDao(
+            adminClient, null, changeStreamId, 
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
+    metadataTableAdminDao.createMetadataTable();
+    metadataTableDao =
+        new MetadataTableDao(
+            dataClient,
+            metadataTableAdminDao.getTableId(),
+            metadataTableAdminDao.getChangeStreamNamePrefix());
+
+    startTime = Timestamp.now();
+    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, 
startTime.getNanos());
+    action =
+        new DetectNewPartitionsAction(
+            metrics, metadataTableDao, endTime, 
generateInitialPartitionsAction);
+    watermarkEstimator = new 
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
+  }
+
+  @Test
+  public void testInitialPartitions() throws Exception {
+    OffsetRange offsetRange = new OffsetRange(0, Long.MAX_VALUE);
+    when(tracker.currentRestriction()).thenReturn(offsetRange);
+    assertEquals(TimestampConverter.toInstant(startTime), 
watermarkEstimator.currentWatermark());
+
+    when(generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime))
+        .thenReturn(ProcessContinuation.resume());
+
+    assertEquals(
+        ProcessContinuation.resume(),
+        action.run(tracker, receiver, watermarkEstimator, bundleFinalizer, 
startTime));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
new file mode 100644
index 00000000000..c407ec70e63
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class GenerateInitialPartitionsActionTest {
+  @ClassRule
+  public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = 
BigtableEmulatorRule.create();
+
+  @Mock private RestrictionTracker<OffsetRange, Long> tracker;
+  @Mock private ChangeStreamDao changeStreamDao;
+  @Mock private ChangeStreamMetrics metrics;
+  @Mock private OutputReceiver<PartitionRecord> receiver;
+
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private Timestamp startTime;
+  private Timestamp endTime;
+  private static BigtableTableAdminClient adminClient;
+
+  @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    BigtableTableAdminSettings adminSettings =
+        
BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+            .setProjectId("fake-project")
+            .setInstanceId("fake-instance")
+            .build();
+    adminClient = BigtableTableAdminClient.create(adminSettings);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    String changeStreamId = UniqueIdGenerator.generateRowKeyPrefix();
+    MetadataTableAdminDao metadataTableAdminDao =
+        new MetadataTableAdminDao(
+            adminClient, null, changeStreamId, 
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
+    metadataTableAdminDao.createMetadataTable();
+    startTime = Timestamp.now();
+    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, 
startTime.getNanos());
+    watermarkEstimator = new 
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
+  }
+
+  @Test
+  public void testGenerateInitialPartitionsFromStartTime() {
+    when(tracker.tryClaim(0L)).thenReturn(true);
+
+    ByteStringRange partition1 = ByteStringRange.create("", "b");
+    ByteStringRange partition2 = ByteStringRange.create("b", "");
+    List<ByteStringRange> partitionRecordList = Arrays.asList(partition1, 
partition2);
+    
when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
+
+    GenerateInitialPartitionsAction generateInitialPartitionsAction =
+        new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+    assertEquals(
+        ProcessContinuation.resume(),
+        generateInitialPartitionsAction.run(receiver, tracker, 
watermarkEstimator, startTime));
+    verify(receiver, times(2))
+        .outputWithTimestamp(partitionRecordArgumentCaptor.capture(), 
eq(Instant.EPOCH));
+    List<PartitionRecord> actualPartitions = 
partitionRecordArgumentCaptor.getAllValues();
+
+    assertEquals(partition1, actualPartitions.get(0).getPartition());
+    assertEquals(startTime, actualPartitions.get(0).getStartTime());
+    assertEquals(partition2, actualPartitions.get(1).getPartition());
+    assertEquals(startTime, actualPartitions.get(1).getStartTime());
+  }
+}

Reply via email to