This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by
this push:
new e743ebaf818 Implement generating initial list of partitions (#25411)
e743ebaf818 is described below
commit e743ebaf818039a01d810b96858c4270be05d638
Author: Tony Tang <[email protected]>
AuthorDate: Mon Feb 13 21:37:43 2023 -0500
Implement generating initial list of partitions (#25411)
---
.../changestreams/ChangeStreamMetrics.java | 25 ++++
.../action/DetectNewPartitionsAction.java | 10 +-
.../action/GenerateInitialPartitionsAction.java | 42 ++++++-
.../changestreams/dao/ChangeStreamDao.java | 11 ++
.../action/DetectNewPartitionsActionTest.java | 127 +++++++++++++++++++++
.../GenerateInitialPartitionsActionTest.java | 119 +++++++++++++++++++
6 files changed, 326 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2aaa6631ace..f8177adf873 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -18,8 +18,33 @@
package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
import java.io.Serializable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
/** Class to aggregate metrics related functionality. */
public class ChangeStreamMetrics implements Serializable {
private static final long serialVersionUID = 7298901109362981596L;
+ // ------------------------
+ // Partition record metrics
+
+ /**
+ * Counter for the total number of partitions identified during the
execution of the Connector.
+ */
+ public static final Counter LIST_PARTITIONS_COUNT =
+ Metrics.counter(
+
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+ "list_partitions_count");
+
+ /**
+ * Increments the {@link
+ *
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT}
by
+ * 1 if the metric is enabled.
+ */
+ public void incListPartitionsCount() {
+ inc(LIST_PARTITIONS_COUNT);
+ }
+
+ private void inc(Counter counter) {
+ counter.inc();
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 990d9b2fd01..932796d4ba8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -25,7 +25,8 @@ import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -93,11 +94,14 @@ public class DetectNewPartitionsAction {
@VisibleForTesting
public ProcessContinuation run(
RestrictionTracker<OffsetRange, Long> tracker,
- DoFn.OutputReceiver<PartitionRecord> receiver,
+ OutputReceiver<PartitionRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
- DoFn.BundleFinalizer bundleFinalizer,
+ BundleFinalizer bundleFinalizer,
Timestamp startTime)
throws Exception {
+ if (tracker.currentRestriction().getFrom() == 0L) {
+ return generateInitialPartitionsAction.run(receiver, tracker,
watermarkEstimator, startTime);
+ }
// Terminate if endTime <= watermark that means all partitions have read
up to or beyond
// watermark. We no longer need to manage splits and merges, we can
terminate.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index be61c9a9e9b..164a679a26f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -18,20 +18,30 @@
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to generate first set of outputs for {@link
*
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn}.
*/
-@SuppressWarnings({"UnusedVariable", "UnusedMethod"})
public class GenerateInitialPartitionsAction {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenerateInitialPartitionsAction.class);
+
private final ChangeStreamMetrics metrics;
private final ChangeStreamDao changeStreamDao;
@Nullable private final Timestamp endTime;
@@ -47,12 +57,34 @@ public class GenerateInitialPartitionsAction {
* The very first step of the pipeline when there are no partitions being
streamed yet. We want to
* get an initial list of partitions to stream and output them.
*
- * @return true if this pipeline should continue, otherwise false.
+ * @return {@link ProcessContinuation#resume()} if the stream continues,
otherwise {@link
+ * ProcessContinuation#stop()}
*/
- public boolean run(
+ public ProcessContinuation run(
OutputReceiver<PartitionRecord> receiver,
+ RestrictionTracker<OffsetRange, Long> tracker,
ManualWatermarkEstimator<Instant> watermarkEstimator,
- Timestamp startTime) {
- return true;
+ com.google.cloud.Timestamp startTime) {
+ if (!tracker.tryClaim(0L)) {
+ LOG.error(
+ "Could not claim initial DetectNewPartition restriction. No
partitions are outputted.");
+ return ProcessContinuation.stop();
+ }
+ List<ByteStringRange> streamPartitions =
+ changeStreamDao.generateInitialChangeStreamPartitions();
+
+ watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+ for (ByteStringRange partition : streamPartitions) {
+ metrics.incListPartitionsCount();
+ String uid = UniqueIdGenerator.getNextId();
+ PartitionRecord partitionRecord =
+ new PartitionRecord(partition, startTime, uid, startTime, endTime);
+ // We are outputting elements with timestamp of 0 to prevent reliance on
event time. This
+ // limits the ability to window on commit time of any data changes. It
is still possible to
+ // window on processing time.
+ receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
+ }
+ return ProcessContinuation.resume();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 147b872dba0..c890193a168 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,4 +35,13 @@ public class ChangeStreamDao {
this.dataClient = dataClient;
this.tableId = tableId;
}
+
+ /**
+ * Returns the result from GenerateInitialChangeStreamPartitions API.
+ *
+ * @return list of StreamPartition
+ */
+ public List<ByteStringRange> generateInitialChangeStreamPartitions() {
+ return
dataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
new file mode 100644
index 00000000000..b3f86c9be10
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
+import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DetectNewPartitionsActionTest {
+ @ClassRule
+ public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE =
BigtableEmulatorRule.create();
+
+ @Mock private ChangeStreamMetrics metrics;
+ @Mock private GenerateInitialPartitionsAction
generateInitialPartitionsAction;
+
+ private DetectNewPartitionsAction action;
+
+ @Mock private RestrictionTracker<OffsetRange, Long> tracker;
+ @Mock private OutputReceiver<PartitionRecord> receiver;
+ @Mock private BundleFinalizer bundleFinalizer;
+
+ private MetadataTableDao metadataTableDao;
+ private ManualWatermarkEstimator<Instant> watermarkEstimator;
+ private Timestamp startTime;
+ private Timestamp endTime;
+ private static BigtableDataClient dataClient;
+ private static BigtableTableAdminClient adminClient;
+
+ @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ BigtableTableAdminSettings adminSettings =
+
BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .build();
+ adminClient = BigtableTableAdminClient.create(adminSettings);
+ BigtableDataSettings dataSettingsBuilder =
+
BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .build();
+ dataClient = BigtableDataClient.create(dataSettingsBuilder);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ String changeStreamId = UniqueIdGenerator.generateRowKeyPrefix();
+ MetadataTableAdminDao metadataTableAdminDao =
+ new MetadataTableAdminDao(
+ adminClient, null, changeStreamId,
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
+ metadataTableAdminDao.createMetadataTable();
+ metadataTableDao =
+ new MetadataTableDao(
+ dataClient,
+ metadataTableAdminDao.getTableId(),
+ metadataTableAdminDao.getChangeStreamNamePrefix());
+
+ startTime = Timestamp.now();
+ endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10,
startTime.getNanos());
+ action =
+ new DetectNewPartitionsAction(
+ metrics, metadataTableDao, endTime,
generateInitialPartitionsAction);
+ watermarkEstimator = new
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
+ }
+
+ @Test
+ public void testInitialPartitions() throws Exception {
+ OffsetRange offsetRange = new OffsetRange(0, Long.MAX_VALUE);
+ when(tracker.currentRestriction()).thenReturn(offsetRange);
+ assertEquals(TimestampConverter.toInstant(startTime),
watermarkEstimator.currentWatermark());
+
+ when(generateInitialPartitionsAction.run(receiver, tracker,
watermarkEstimator, startTime))
+ .thenReturn(ProcessContinuation.resume());
+
+ assertEquals(
+ ProcessContinuation.resume(),
+ action.run(tracker, receiver, watermarkEstimator, bundleFinalizer,
startTime));
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
new file mode 100644
index 00000000000..c407ec70e63
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
+import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
+import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
+import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class GenerateInitialPartitionsActionTest {
+ @ClassRule
+ public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE =
BigtableEmulatorRule.create();
+
+ @Mock private RestrictionTracker<OffsetRange, Long> tracker;
+ @Mock private ChangeStreamDao changeStreamDao;
+ @Mock private ChangeStreamMetrics metrics;
+ @Mock private OutputReceiver<PartitionRecord> receiver;
+
+ private ManualWatermarkEstimator<Instant> watermarkEstimator;
+ private Timestamp startTime;
+ private Timestamp endTime;
+ private static BigtableTableAdminClient adminClient;
+
+ @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ BigtableTableAdminSettings adminSettings =
+
BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort())
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .build();
+ adminClient = BigtableTableAdminClient.create(adminSettings);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ String changeStreamId = UniqueIdGenerator.generateRowKeyPrefix();
+ MetadataTableAdminDao metadataTableAdminDao =
+ new MetadataTableAdminDao(
+ adminClient, null, changeStreamId,
MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
+ metadataTableAdminDao.createMetadataTable();
+ startTime = Timestamp.now();
+ endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10,
startTime.getNanos());
+ watermarkEstimator = new
WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
+ }
+
+ @Test
+ public void testGenerateInitialPartitionsFromStartTime() {
+ when(tracker.tryClaim(0L)).thenReturn(true);
+
+ ByteStringRange partition1 = ByteStringRange.create("", "b");
+ ByteStringRange partition2 = ByteStringRange.create("b", "");
+ List<ByteStringRange> partitionRecordList = Arrays.asList(partition1,
partition2);
+
when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
+
+ GenerateInitialPartitionsAction generateInitialPartitionsAction =
+ new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+ assertEquals(
+ ProcessContinuation.resume(),
+ generateInitialPartitionsAction.run(receiver, tracker,
watermarkEstimator, startTime));
+ verify(receiver, times(2))
+ .outputWithTimestamp(partitionRecordArgumentCaptor.capture(),
eq(Instant.EPOCH));
+ List<PartitionRecord> actualPartitions =
partitionRecordArgumentCaptor.getAllValues();
+
+ assertEquals(partition1, actualPartitions.get(0).getPartition());
+ assertEquals(startTime, actualPartitions.get(0).getStartTime());
+ assertEquals(partition2, actualPartitions.get(1).getPartition());
+ assertEquals(startTime, actualPartitions.get(1).getStartTime());
+ }
+}