fapaul commented on a change in pull request #16838:
URL: https://github.com/apache/flink/pull/16838#discussion_r689378794



##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
##########
@@ -203,10 +203,12 @@ public static void waitUtil(Supplier<Boolean> condition, 
Duration timeout, Strin
             throw new IllegalArgumentException("The timeout must be 
positive.");
         }
         long startingTime = System.currentTimeMillis();
-        while (!condition.get() && System.currentTimeMillis() - startingTime < 
timeoutMs) {
+        boolean conditionResult = false;

Review comment:
       ```suggestion
           boolean conditionResult = condition.get();
   ```
   Is it necessary to do one loop here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -161,6 +171,13 @@ public KafkaPartitionSplitReader(
                                 finishedPartitions,
                                 recordsBySplits);
                     }
+                    // Update I/O metric
+                    sourceReaderMetricGroup
+                            .getIOMetricGroup()
+                            .getNumBytesInCounter()
+                            .inc(

Review comment:
       I have not much experience with how Kafka records are structured but did 
you consider also counting the metadata fields?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -414,6 +431,30 @@ private void maybeRegisterKafkaConsumerMetrics(
         }
     }
 
+    private void registerPendingRecordsMetric(
+            SourceReaderMetricGroup metricGroup, KafkaConsumer<?, ?> consumer) 
{
+
+        final String recordsLagMaxName = "records-lag-max";

Review comment:
       I wonder if this is the correct metric to track. AFAIK this metric 
describes the following 
   `The maximum lag in terms of number of records for any partition in this 
window` so every subtask will show the same lag value. 
   
   I know there is a similar metric like `records-lag` which is defined per 
partition. One could compute the sum of all the currently assigned partitions 
to get a proper lag calculation.
   
   Unfortunately, the last time I looked into these metrics I remember vaguely 
that the _per-partition_ metrics are only registered after one record was 
consumed.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
##########
@@ -203,10 +203,12 @@ public static void waitUtil(Supplier<Boolean> condition, 
Duration timeout, Strin
             throw new IllegalArgumentException("The timeout must be 
positive.");
         }
         long startingTime = System.currentTimeMillis();
-        while (!condition.get() && System.currentTimeMillis() - startingTime < 
timeoutMs) {
+        boolean conditionResult = false;

Review comment:
       This also seems like a bugfix commit :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to