This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed14f21de94 [feat][broker] PIP-264: Add replication subscription stats 
(#23026)
ed14f21de94 is described below

commit ed14f21de94c6af2bfae5318a014c56fac8a1a21
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Aug 30 10:00:54 2024 -0700

    [feat][broker] PIP-264: Add replication subscription stats (#23026)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  3 +
 .../ReplicatedSubscriptionsController.java         | 24 ++++++--
 .../ReplicatedSubscriptionsSnapshotBuilder.java    | 14 +++--
 .../OpenTelemetryReplicatedSubscriptionStats.java  | 72 ++++++++++++++++++++++
 .../broker/service/ReplicatedSubscriptionTest.java | 19 +++++-
 .../ReplicatedSubscriptionConfigTest.java          |  9 ++-
 ...ReplicatedSubscriptionsSnapshotBuilderTest.java | 35 ++++++-----
 7 files changed, 146 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9e147517ac7..0b994c640a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -114,6 +114,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
@@ -265,6 +266,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private OpenTelemetryConsumerStats openTelemetryConsumerStats;
     private OpenTelemetryProducerStats openTelemetryProducerStats;
     private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
+    private OpenTelemetryReplicatedSubscriptionStats 
openTelemetryReplicatedSubscriptionStats;
     private OpenTelemetryTransactionCoordinatorStats 
openTelemetryTransactionCoordinatorStats;
     private OpenTelemetryTransactionPendingAckStoreStats 
openTelemetryTransactionPendingAckStoreStats;
 
@@ -861,6 +863,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
             openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
             openTelemetryReplicatorStats = new 
OpenTelemetryReplicatorStats(this);
+            openTelemetryReplicatedSubscriptionStats = new 
OpenTelemetryReplicatedSubscriptionStats(this);
 
             localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                     ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index a8e6885525a..b873bc93cd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
 import org.apache.pulsar.common.api.proto.ClusterMessageId;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -49,6 +50,7 @@ import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest
 import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 /**
  * Encapsulate all the logic of replicated subscriptions tracking for a given 
topic.
@@ -70,19 +72,25 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
     private final ConcurrentMap<String, 
ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots =
             new ConcurrentHashMap<>();
 
+    @PulsarDeprecatedMetric(
+            newMetricName = 
OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
+    @Deprecated
     private static final Gauge pendingSnapshotsMetric = Gauge
             .build("pulsar_replicated_subscriptions_pending_snapshots",
                     "Counter of currently pending snapshots")
             .register();
 
+    private final OpenTelemetryReplicatedSubscriptionStats stats;
+
     public ReplicatedSubscriptionsController(PersistentTopic topic, String 
localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
-        timer = topic.getBrokerService().pulsar().getExecutor()
+        var pulsar = topic.getBrokerService().pulsar();
+        timer = pulsar.getExecutor()
                 
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::startNewSnapshot), 0,
-                        topic.getBrokerService().pulsar().getConfiguration()
-                                
.getReplicatedSubscriptionsSnapshotFrequencyMillis(),
+                        
pulsar.getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(),
                         TimeUnit.MILLISECONDS);
+        stats = pulsar.getOpenTelemetryReplicatedSubscriptionStats();
     }
 
     public void receivedReplicatedSubscriptionMarker(Position position, int 
markerType, ByteBuf payload) {
@@ -233,11 +241,11 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         }
 
         pendingSnapshotsMetric.inc();
+        stats.recordSnapshotStarted();
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(this,
                 topic.getReplicators().keys(), 
topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
         pendingSnapshots.put(builder.getSnapshotId(), builder);
         builder.start();
-
     }
 
     public Optional<String> getLastCompletedSnapshotId() {
@@ -254,6 +262,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
                 }
 
                 pendingSnapshotsMetric.dec();
+                var latencyMillis = entry.getValue().getDurationMillis();
+                stats.recordSnapshotTimedOut(latencyMillis);
                 it.remove();
             }
         }
@@ -261,11 +271,15 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
 
     void snapshotCompleted(String snapshotId) {
         ReplicatedSubscriptionsSnapshotBuilder snapshot = 
pendingSnapshots.remove(snapshotId);
-        pendingSnapshotsMetric.dec();
         lastCompletedSnapshotId = snapshotId;
 
         if (snapshot != null) {
             lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
+
+            pendingSnapshotsMetric.dec();
+            var latencyMillis = snapshot.getDurationMillis();
+            
ReplicatedSubscriptionsSnapshotBuilder.SNAPSHOT_METRIC.observe(latencyMillis);
+            stats.recordSnapshotCompleted(latencyMillis);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
index 4eb20f02907..0dacade3eed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
@@ -30,9 +30,11 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats;
 import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
 import 
org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 @Slf4j
 public class ReplicatedSubscriptionsSnapshotBuilder {
@@ -52,11 +54,13 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
 
     private final Clock clock;
 
-    private static final Summary snapshotMetric = 
Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME)
+    @Deprecated
+    public static final Summary SNAPSHOT_METRIC = 
Summary.build("pulsar_replicated_subscriptions_snapshot_ms",
             "Time taken to create a consistent snapshot across 
clusters").register();
 
     public 
ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController 
controller,
-            List<String> remoteClusters, ServiceConfiguration conf, Clock 
clock) {
+                                                  List<String> remoteClusters, 
ServiceConfiguration conf, Clock clock) {
         this.snapshotId = UUID.randomUUID().toString();
         this.controller = controller;
         this.remoteClusters = remoteClusters;
@@ -123,8 +127,6 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
                         p.getLedgerId(), p.getEntryId(), responses));
         controller.snapshotCompleted(snapshotId);
 
-        double latencyMillis = clock.millis() - startTimeMillis;
-        snapshotMetric.observe(latencyMillis);
     }
 
     boolean isTimedOut() {
@@ -134,4 +136,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder {
     long getStartTimeMillis() {
         return startTimeMillis;
     }
+
+    long getDurationMillis() {
+        return clock.millis() - startTimeMillis;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
new file mode 100644
index 00000000000..55982eba243
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryReplicatedSubscriptionStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.stats.MetricsUtil;
+
+public class OpenTelemetryReplicatedSubscriptionStats {
+
+    public static final AttributeKey<String> SNAPSHOT_OPERATION_RESULT =
+            
AttributeKey.stringKey("pulsar.replication.subscription.snapshot.operation.result");
+    public enum SnapshotOperationResult {
+        SUCCESS,
+        TIMEOUT;
+        private final Attributes attributes = 
Attributes.of(SNAPSHOT_OPERATION_RESULT, name().toLowerCase());
+    }
+
+    public static final String SNAPSHOT_OPERATION_COUNT_METRIC_NAME =
+            "pulsar.broker.replication.subscription.snapshot.operation.count";
+    private final LongCounter snapshotOperationCounter;
+
+    public static final String SNAPSHOT_DURATION_METRIC_NAME =
+            
"pulsar.broker.replication.subscription.snapshot.operation.duration";
+    private final DoubleHistogram snapshotDuration;
+
+    public OpenTelemetryReplicatedSubscriptionStats(PulsarService pulsar) {
+        var meter = pulsar.getOpenTelemetry().getMeter();
+        snapshotOperationCounter = 
meter.counterBuilder(SNAPSHOT_OPERATION_COUNT_METRIC_NAME)
+                .setDescription("The number of snapshot operations attempted")
+                .setUnit("{operation}")
+                .build();
+        snapshotDuration = 
meter.histogramBuilder(SNAPSHOT_DURATION_METRIC_NAME)
+                .setDescription("Time taken to complete a consistent snapshot 
operation across clusters")
+                .setUnit("s")
+                .build();
+    }
+
+    public void recordSnapshotStarted() {
+        snapshotOperationCounter.add(1);
+    }
+
+    public void recordSnapshotTimedOut(long durationMs) {
+        snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, 
TimeUnit.MILLISECONDS),
+                SnapshotOperationResult.TIMEOUT.attributes);
+    }
+
+    public void recordSnapshotCompleted(long durationMs) {
+        snapshotDuration.record(MetricsUtil.convertToSeconds(durationMs, 
TimeUnit.MILLISECONDS),
+                SnapshotOperationResult.SUCCESS.attributes);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index e5aad47dc89..4273e8bbaeb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static 
org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_DURATION_METRIC_NAME;
+import static 
org.apache.pulsar.broker.stats.OpenTelemetryReplicatedSubscriptionStats.SNAPSHOT_OPERATION_COUNT_METRIC_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -26,7 +30,8 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
-
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
@@ -141,7 +146,6 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
                 producer.send(body.getBytes(StandardCharsets.UTF_8));
                 sentMessages.add(body);
             }
-            producer.close();
         }
 
         Set<String> receivedMessages = new LinkedHashSet<>();
@@ -170,6 +174,17 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
         // assert that all messages have been received
         assertEquals(new ArrayList<>(sentMessages), new 
ArrayList<>(receivedMessages), "Sent and received " +
                 "messages don't match.");
+
+        var metrics1 = metricReader1.collectAllMetrics();
+        assertMetricLongSumValue(metrics1, 
SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
+                Attributes.empty(),value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics1, 
SNAPSHOT_OPERATION_COUNT_METRIC_NAME,
+                Attributes.empty(), value -> assertThat(value).isPositive());
+        assertThat(metrics1)
+                .anySatisfy(metric -> 
OpenTelemetryAssertions.assertThat(metric)
+                        .hasName(SNAPSHOT_DURATION_METRIC_NAME)
+                        .hasHistogramSatisfying(histogram -> 
histogram.hasPointsSatisfying(
+                                histogramPoint -> 
histogramPoint.hasSumGreaterThan(0.0))));
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
index aa0015742f6..604326203e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-
 import lombok.Cleanup;
-
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
@@ -48,6 +47,12 @@ public class ReplicatedSubscriptionConfigTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @Test
     public void createReplicatedSubscription() throws Exception {
         this.conf.setEnableReplicatedSubscriptions(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
index f5c3bb9d75b..562c5eda581 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilderTest.java
@@ -25,11 +25,8 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
-
 import java.time.Clock;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -71,7 +68,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
             Commands.skipMessageMetadata(marker);
             markers.add(marker);
             return null;
-        }).when(controller)
+        })
+                .when(controller)
                 .writeMarker(any(ByteBuf.class));
     }
 
@@ -80,7 +78,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         List<String> remoteClusters = Collections.singletonList("b");
 
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(controller,
-                remoteClusters, conf, clock);
+                remoteClusters,
+                conf, clock);
 
         assertTrue(markers.isEmpty());
 
@@ -93,8 +92,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Simulate the responses coming back
-        ReplicatedSubscriptionsSnapshotResponse response = new 
ReplicatedSubscriptionsSnapshotResponse()
-                .setSnapshotId("snapshot-1");
+        ReplicatedSubscriptionsSnapshotResponse response = new 
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+                "snapshot-1");
         response.setCluster()
                 .setCluster("b")
                 .setMessageId()
@@ -119,7 +118,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         List<String> remoteClusters = Arrays.asList("b", "c");
 
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(controller,
-                remoteClusters, conf, clock);
+                remoteClusters,
+                conf, clock);
 
         assertTrue(markers.isEmpty());
 
@@ -132,8 +132,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Simulate the responses coming back
-        ReplicatedSubscriptionsSnapshotResponse response1 = new 
ReplicatedSubscriptionsSnapshotResponse()
-                .setSnapshotId("snapshot-1");
+        ReplicatedSubscriptionsSnapshotResponse response1 = new 
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+                "snapshot-1");
         response1.setCluster()
                 .setCluster("b")
                 .setMessageId()
@@ -144,8 +144,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         // No markers should be sent out
         assertTrue(markers.isEmpty());
 
-        ReplicatedSubscriptionsSnapshotResponse response2 = new 
ReplicatedSubscriptionsSnapshotResponse()
-                .setSnapshotId("snapshot-1");
+        ReplicatedSubscriptionsSnapshotResponse response2 = new 
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+                "snapshot-1");
         response2.setCluster()
                 .setCluster("c")
                 .setMessageId()
@@ -159,8 +159,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         assertEquals(request.getSourceCluster(), localCluster);
 
         // Responses coming back
-        ReplicatedSubscriptionsSnapshotResponse response3 = new 
ReplicatedSubscriptionsSnapshotResponse()
-                .setSnapshotId("snapshot-1");
+        ReplicatedSubscriptionsSnapshotResponse response3 = new 
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+                "snapshot-1");
         response3.setCluster()
                 .setCluster("b")
                 .setMessageId()
@@ -171,8 +171,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         // No markers should be sent out
         assertTrue(markers.isEmpty());
 
-        ReplicatedSubscriptionsSnapshotResponse response4 = new 
ReplicatedSubscriptionsSnapshotResponse()
-                .setSnapshotId("snapshot-1");
+        ReplicatedSubscriptionsSnapshotResponse response4 = new 
ReplicatedSubscriptionsSnapshotResponse().setSnapshotId(
+                "snapshot-1");
         response4.setCluster()
                 .setCluster("c")
                 .setMessageId()
@@ -201,7 +201,8 @@ public class ReplicatedSubscriptionsSnapshotBuilderTest {
         List<String> remoteClusters = Collections.singletonList("b");
 
         ReplicatedSubscriptionsSnapshotBuilder builder = new 
ReplicatedSubscriptionsSnapshotBuilder(controller,
-                remoteClusters, conf, clock);
+                remoteClusters,
+                conf, clock);
 
         assertFalse(builder.isTimedOut());
 

Reply via email to