gyfora commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1600288388


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
                 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
     }
 
-    private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-            throws Exception {
+    private void updateKafkaPulsarSourceMaxParallelisms(
+            Context ctx, JobID jobId, JobTopology topology) throws Exception {
         try (var restClient = ctx.getRestClusterClient()) {
-            var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+            Pattern partitionRegex =
+                    Pattern.compile(
+                            
"^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
+                                    + 
"|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
             for (var vertexInfo : topology.getVertexInfos().values()) {
                 if (vertexInfo.getInputs().isEmpty()) {
                     var sourceVertex = vertexInfo.getId();
                     var numPartitions =
                             queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-                                    .filter(partitionRegex.asMatchPredicate())
-                                    .count();
+                                    .map(
+                                            v -> {
+                                                Matcher matcher = 
partitionRegex.matcher(v);
+                                                if (matcher.matches()) {
+                                                    String kafkaTopic = 
matcher.group("kafkaTopic");
+                                                    String kafkaId = 
matcher.group("kafkaId");
+                                                    String pulsarTopic =
+                                                            
matcher.group("pulsarTopic");
+                                                    String pulsarId = 
matcher.group("pulsarId");
+                                                    return kafkaTopic != null
+                                                            ? kafkaTopic + "-" 
+ kafkaId
+                                                            : pulsarTopic + 
"-" + pulsarId;

Review Comment:
   here again I would prefer to completely separate the logic for the 
pulser/kafka counting as the logic doesn't really overlap and this is just more 
confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to