Izeren commented on code in PR #27692:
URL: https://github.com/apache/flink/pull/27692#discussion_r2980852811


##########
flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java:
##########
@@ -269,39 +298,136 @@ private Map<Metric, Long> takeLastValueSnapshots() {
         return map;
     }
 
-    private @Nullable CompletableResultCode lastResult;
+    private volatile @Nullable CompletableFuture<Void> lastReportFuture;
 
     @Override
     public void report() {
-        Collection<MetricData> metricData = collectAllMetrics();
+        final List<MetricData> metricData = List.copyOf(collectAllMetrics());
+        final int totalMetrics = metricData.size();
+        if (totalMetrics == 0) {
+            return;
+        }
+
+        // In order to avoid potentially large memory allocations on 
`.partition()` call.
+        // doesn't require additional synchronized as it comes after 
collectAllMetrics, which
+        // is synchronized
+        final int localBatchSize = Math.min(batchSize, totalMetrics);
+
+        final Iterable<List<MetricData>> batches = Lists.partition(metricData, 
localBatchSize);
+        final int totalBatches = (totalMetrics + localBatchSize - 1) / 
localBatchSize;
+        final List<CompletableResultCode> results = new 
ArrayList<>(totalBatches);
+        for (final List<MetricData> batch : batches) {
+            results.add(exportBatch(batch));
+        }

Review Comment:
   That was removed from the FLIP during review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to