jerrypeng commented on a change in pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#discussion_r583241360



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -1143,6 +1178,145 @@ public void testPulsarSourceStatsWithUrl() throws 
Exception {
                 fileServer.getAddress().getPort());
         testPulsarSourceStats(jarFilePathUrl);
     }
+    
+    private void testPulsarBatchSourceStats(String jarFilePathUrl) throws 
Exception {
+       final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sourceName = "PulsarBatchSource";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        SourceConfig sourceConfig = createSourceConfig(tenant, 
namespacePortion, sourceName, sinkTopic);
+        sourceConfig.setBatchSourceConfig(createBatchSourceConfig());
+        
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() 
== 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output-" 
+ sourceName;
+        sourceConfig.setTopicName(sinkTopic2);
+        admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && 
sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && 
sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s",
 tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertNotNull(sourceStats.publishers.get(0).metadata);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), 
String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic2).publishers.size() 
== 1) && (admin.topics().getInternalStats(sinkTopic2, false).numberOfEntries > 
4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
+
+        String prometheusMetrics = 
getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_source_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
+        assertEquals(m.tags.get("fqfn"), 
FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+

Review comment:
       Please also delete the source at the end of test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to