This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed14f21de94 [feat][broker] PIP-264: Add replication subscription stats
(#23026)
ed14f21de94 is described below
commit ed14f21de94c6af2bfae5318a014c56fac8a1a21
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Aug 30 10:00:54 2024 -0700
[feat][broker] PIP-264: Add replication subscription stats (#23026)
---
.../org/apache/pulsar/broker/PulsarService.java | 3 +
.../ReplicatedSubscriptionsController.java | 24 ++++++--
.../ReplicatedSubscriptionsSnapshotBuilder.java | 14 +++--
.../OpenTelemetryReplicatedSubscriptionStats.java | 72 ++++++++++++++++++++++
.../broker/service/ReplicatedSubscriptionTest.java | 19 +++++-
.../ReplicatedSubscriptionConfigTest.java | 9 ++-
...ReplicatedSubscriptionsSnapshotBuilderTest.java | 35 ++++++-----
7 files changed, 146 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9e147517ac7..0b994c640a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -114,6 +114,7 @@ import
org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
@@ -265,6 +266,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
private OpenTelemetryProducerStats openTelemetryProducerStats;
private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
+ private OpenTelemetryReplicatedSubscriptionStats
openTelemetryReplicatedSubscriptionStats;
private OpenTelemetryTransactionCoordinatorStats
openTelemetryTransactionCoordinatorStats;
private OpenTelemetryTransactionPendingAckStoreStats
openTelemetryTransactionPendingAckStoreStats;
@@ -861,6 +863,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
openTelemetryReplicatorStats = new
OpenTelemetryReplicatorStats(this);
+ openTelemetryReplicatedSubscriptionStats = new
OpenTelemetryReplicatedSubscriptionStats(this);
localMetadataSynchronizer =
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this,
config.getMetadataSyncEventTopic())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index a8e6885525a..b873bc93cd1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.common.api.proto.ClusterMessageId;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -49,6 +50,7 @@ import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest
import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
/**
* Encapsulate all the logic of replicated subscriptions tracking for a given
topic.
@@ -70,19 +72,25 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
private final ConcurrentMap<String,
ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots =
new ConcurrentHashMap<>();
+ @PulsarDeprecatedMetric(
+ newMetricName =
OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
+ @Deprecated
private static final Gauge pendingSnapshotsMetric = Gauge
.build("pulsar_replicated_subscriptions_pending_snapshots",
"Counter of currently pending snapshots")
.register();
+ private final OpenTelemetryReplicatedSubscriptionStats stats;
+
public ReplicatedSubscriptionsController(PersistentTopic topic, String
localCluster) {
this.topic = topic;
this.localCluster = localCluster;
- timer = topic.getBrokerService().pulsar().getExecutor()
+ var pulsar = topic.getBrokerService().pulsar();
+ timer = pulsar.getExecutor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0,
- topic.getBrokerService().pulsar().getConfiguration()
-
.getReplicatedSubscriptionsSnapshotFrequencyMillis(),
+
pulsar.getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(),
TimeUnit.MILLISECONDS);
+ stats = pulsar.getOpenTelemetryReplicatedSubscriptionStats();
}
public void receivedReplicatedSubscriptionMarker(Position position, int
markerType, ByteBuf payload) {
@@ -233,11 +241,11 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
}
pendingSnapshotsMetric.inc();
+ stats.recordSnapshotStarted();
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(this,
topic.getReplicators().keys(),
topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
pendingSnapshots.put(builder.getSnapshotId(), builder);
builder.start();
-
}
public Optional<String> getLastCompletedSnapshotId() {
@@ -254,6 +262,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
}
pendingSnapshotsMetric.dec();
+ var latencyMillis = entry.getValue().getDurationMillis();
+ stats.recordSnapshotTimedOut(latencyMillis);
it.remove();
}
}
@@ -261,11 +271,15 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
void snapshotCompleted(String snapshotId) {
ReplicatedSubscriptionsSnapshotBuilder snapshot =
pendingSnapshots.remove(snapshotId);
- pendingSnapshotsMetric.dec();
lastCompletedSnapshotId = snapshotId;
if (snapshot != null) {
lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
+
+ pendingSnapshotsMetric.dec();
+ var latencyMillis = snapshot.getDurationMillis();
+
ReplicatedSubscriptionsSnapshotBuilder.SNAPSHOT_METRIC.observe(latencyMillis);
+ stats.recordSnapshotCompleted(latencyMillis);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
index 4eb20f02907..0dacade3eed 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -30,9 +30,11 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
@Slf4j
public class ReplicatedSubscriptionsSnapshotBuilder {
@@ -52,11 +54,13 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
private final Clock clock;
- private static final Summary snapshotMetric =
Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
+ @PulsarDeprecatedMetric(newMetricName =
OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME)
+ @Deprecated
+ public static final Summary SNAPSHOT_METRIC =
Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
"Time taken to create a consistent snapshot across
clusters").register();
public
ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController
controller,
- List<String> remoteClusters, ServiceConfiguration conf, Clock
clock) {
+ List<String> remoteClusters,
ServiceConfiguration conf, Clock clock) {
this.snapshotId = UUID.randomUUID().toString();
this.controller = controller;
this.remoteClusters = remoteClusters;
@@ -123,8 +127,6 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
p.getLedgerId(), p.getEntryId(), responses));
controller.snapshotCompleted(snapshotId);
- double latencyMillis = clock.millis() - startTimeMillis;
- snapshotMetric.observe(latencyMillis);
}
boolean isTimedOut() {
@@ -134,4 +136,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
long getStartTimeMillis() {
return startTimeMillis;
}
+
+ long getDurationMillis() {
+ return clock.millis() - startTimeMillis;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
new file mode 100644
index 00000000000..55982eba243
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.stats.MetricsUtil;
+
+public class OpenTelemetryReplicatedSubscriptionStats {
+
+ public static final AttributeKey<String> SNAPSHOT_OPERATION_RESULT =
+
AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.result");
+ public enum SnapshotOperationResult {
+ SUCCESS,
+ TIMEOUT;
+ private final Attributes attributes =
Attributes.of(SNAPSHOT_OPERATION_RESULT, name().toLowerCase());
+ }
+
+ public static final String SNAPSHOT_OPERATION_COUNT_METRIC_NAME =
+ "pulsar.broker.replication.subscription.snapshot.operation.count";
+ private final LongCounter snapshotOperationCounter;
+
+ public static final String SNAPSHOT_DURATION_METRIC_NAME =
+
"pulsar.broker.replication.subscription.snapshot.operation.duration";
+ private final DoubleHistogram snapshotDuration;
+
+ public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) {
+ var meter = pulsar.getOpenTelemetry().getMeter();
+ snapshotOperationCounter =
meter.counterBuilder(SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
+ .setDescription("The number of snapshot operations attempted")
+ .setUnit("{operation}")
+ .build();
+ snapshotDuration =
meter.histogramBuilder(SNAPSHOT_DURATION_METRIC_NAME)
+ .setDescription("Time taken to complete a consistent snapshot
operation across clusters")
+ .setUnit("s")
+ .build();
+ }
+
+ public void recordSnapshotStarted() {
+ snapshotOperationCounter.add(1);
+ }
+
+ public void recordSnapshotTimedOut(long durationMs) {
+ snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs,
TimeUnit.MILLISECONDS),
+ SnapshotOperationResult.TIMEOUT.attributes);
+ }
+
+ public void recordSnapshotCompleted(long durationMs) {
+ snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs,
TimeUnit.MILLISECONDS),
+ SnapshotOperationResult.SUCCESS.attributes);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index e5aad47dc89..4273e8bbaeb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static
org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME;
+import static
org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -26,7 +30,8 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
-
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
@@ -141,7 +146,6 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
- producer.close();
}
Set<String> receivedMessages = new LinkedHashSet<>();
@@ -170,6 +174,17 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
// assert that all messages have been received
assertEquals(new ArrayList<>(sentMessages), new
ArrayList<>(receivedMessages), "Sent and received " +
"messages don't match.");
+
+ var metrics1 = metricReader1.collectAllMetrics();
+ assertMetricLongSumValue(metrics1,
SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
+ Attributes.empty(),value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics1,
SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
+ Attributes.empty(), value -> assertThat(value).isPositive());
+ assertThat(metrics1)
+ .anySatisfy(metric ->
OpenTelemetryAssertions.assertThat(metric)
+ .hasName(SNAPSHOT_DURATION_METRIC_NAME)
+ .hasHistogramSatisfying(histogram ->
histogram.hasPointsSatisfying(
+ histogramPoint ->
histogramPoint.hasSumGreaterThan(0.0))));
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
index aa0015742f6..604326203e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
import lombok.Cleanup;
-
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
@@ -48,6 +47,12 @@ public class ReplicatedSubscriptionConfigTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
@Test
public void createReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index f5c3bb9d75b..562c5eda581 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -25,11 +25,8 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
import io.netty.buffer.ByteBuf;
-
import java.time.Clock;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -71,7 +68,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
Commands.skipMessageMetadata(marker);
markers.add(marker);
return null;
- }).when(controller)
+ })
+ .when(controller)
.writeMarker(any(ByteBuf.class));
}
@@ -80,7 +78,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
List<String> remoteClusters = Collections.singletonList("b");
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(controller,
- remoteClusters, conf, clock);
+ remoteClusters,
+ conf, clock);
assertTrue(markers.isEmpty());
@@ -93,8 +92,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
assertEquals(request.getSourceCluster(), localCluster);
// Simulate the responses coming back
- ReplicatedSubscriptionsSnapshotResponse response = new
ReplicatedSubscriptionsSnapshotResponse()
- .setSnapshotId("snapshot-1");
+ ReplicatedSubscriptionsSnapshotResponse response = new
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+ "snapshot-1");
response.setCluster()
.setCluster("b")
.setMessageId()
@@ -119,7 +118,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
List<String> remoteClusters = Arrays.asList("b", "c");
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(controller,
- remoteClusters, conf, clock);
+ remoteClusters,
+ conf, clock);
assertTrue(markers.isEmpty());
@@ -132,8 +132,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
assertEquals(request.getSourceCluster(), localCluster);
// Simulate the responses coming back
- ReplicatedSubscriptionsSnapshotResponse response1 = new
ReplicatedSubscriptionsSnapshotResponse()
- .setSnapshotId("snapshot-1");
+ ReplicatedSubscriptionsSnapshotResponse response1 = new
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+ "snapshot-1");
response1.setCluster()
.setCluster("b")
.setMessageId()
@@ -144,8 +144,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// No markers should be sent out
assertTrue(markers.isEmpty());
- ReplicatedSubscriptionsSnapshotResponse response2 = new
ReplicatedSubscriptionsSnapshotResponse()
- .setSnapshotId("snapshot-1");
+ ReplicatedSubscriptionsSnapshotResponse response2 = new
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+ "snapshot-1");
response2.setCluster()
.setCluster("c")
.setMessageId()
@@ -159,8 +159,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
assertEquals(request.getSourceCluster(), localCluster);
// Responses coming back
- ReplicatedSubscriptionsSnapshotResponse response3 = new
ReplicatedSubscriptionsSnapshotResponse()
- .setSnapshotId("snapshot-1");
+ ReplicatedSubscriptionsSnapshotResponse response3 = new
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+ "snapshot-1");
response3.setCluster()
.setCluster("b")
.setMessageId()
@@ -171,8 +171,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
// No markers should be sent out
assertTrue(markers.isEmpty());
- ReplicatedSubscriptionsSnapshotResponse response4 = new
ReplicatedSubscriptionsSnapshotResponse()
- .setSnapshotId("snapshot-1");
+ ReplicatedSubscriptionsSnapshotResponse response4 = new
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+ "snapshot-1");
response4.setCluster()
.setCluster("c")
.setMessageId()
@@ -201,7 +201,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
List<String> remoteClusters = Collections.singletonList("b");
ReplicatedSubscriptionsSnapshotBuilder builder = new
ReplicatedSubscriptionsSnapshotBuilder(controller,
- remoteClusters, conf, clock);
+ remoteClusters,
+ conf, clock);
assertFalse(builder.isTimedOut());