imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r811614446



##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
##########
@@ -44,32 +47,198 @@
 import org.apache.flink.util.UserCodeClassLoader;
 
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collection;
 import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.base.DeliveryGuarantee.AT_LEAST_ONCE;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static 
org.apache.flink.connector.pulsar.sink.writer.metrics.PulsarProducerMetricsRegister.PRODUCER_UPDATE_POLLING_INTERVAL_MILLIS;
 import static 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unit tests for {@link PulsarWriter}. */
 class PulsarWriterTest extends PulsarTestSuiteBase {
-
     private static final SinkWriter.Context CONTEXT = new 
MockSinkWriterContext();
+    private static final String DEFAULT_TEST_PRODUCER_NAME = "test-producer";
+    private static final String EXPECTED_ALL_PRODUCER_METRIC_NAME = 
"PulsarSink.numAcksReceived";
+    private static final String EXPECTED_PER_PRODUCER_METRIC_PREFIX = 
"PulsarSink.producer";
+    private static final String EXPECTED_PER_PRODUCER_METRIC_NAME = 
"sendLatency99Pct";
+    private static final String GLOBAL_MAX_LATENCY_METRIC_NAME = 
"PulsarSink.sendLatencyMax";
+    private static final String CURRENT_SEND_TIME_METRIC_NAME = 
"currentSendTime";
+    private static final String NUM_ACKS_RECEIVED_METRIC_NAME = 
"PulsarSink.numAcksReceived";
+    private static final long TEST_PULSAR_STATS_INTERVAL_SECONDS = 1L;
+
+    private MetricListener metricListener;
+    private TestProcessingTimeService timeService;
+    private MockInitContext initContext;
+
+    @BeforeEach
+    void setup() throws Exception {
+        metricListener = new MetricListener();
+        timeService = new TestProcessingTimeService();
+        timeService.setCurrentTime(0L);
+        initContext = new MockInitContext(metricListener, timeService);
+    }
 
     @Test
     void writeMessageWithGuarantee() throws Exception {
         writeMessageWithoutGuarantee(EXACTLY_ONCE);
     }
 
+    @Test
+    void metricsPresentAfterWriterCreated() throws Exception {
+        String topic = randomAlphabetic(10);
+        operator().createTopic(topic, 8);
+
+        try (final PulsarWriter<String> ignored = createWriter(topic, 
initContext)) {
+            
assertThat(metricListener.getCounter(EXPECTED_ALL_PRODUCER_METRIC_NAME)).isPresent();
+        }
+    }
+
+    @Test
+    void perProducerMetricsPresentAfterMessageWritten() throws Exception {
+        String topic = randomAlphabetic(10);
+        operator().createTopic(topic, 8);
+
+        try (final PulsarWriter<String> writer = createWriter(topic, 
initContext)) {
+            String message = randomAlphabetic(10);
+            sendMessages(writer, 1);
+            // advance timer to update the producers
+            timeService.advance(PRODUCER_UPDATE_POLLING_INTERVAL_MILLIS + 
1000);
+            assertThat(metricListener.getGauge(perProducerMetricName(topic, 
0))).isPresent();
+
+            // send another message and advance timer
+            sendMessages(writer, 1000);

Review comment:
       The 1000 here is a magic number, it should be equal to the switch size 
of round robin router




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to