pnowojski commented on code in PR #27692:
URL: https://github.com/apache/flink/pull/27692#discussion_r2976327076
##########
flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java:
##########
@@ -269,39 +298,136 @@ private Map<Metric, Long> takeLastValueSnapshots() {
return map;
}
- private @Nullable CompletableResultCode lastResult;
+ private volatile @Nullable CompletableFuture<Void> lastReportFuture;
@Override
public void report() {
- Collection<MetricData> metricData = collectAllMetrics();
+ final List<MetricData> metricData = List.copyOf(collectAllMetrics());
+ final int totalMetrics = metricData.size();
+ if (totalMetrics == 0) {
+ return;
+ }
+
+ // In order to avoid potentially large memory allocations on
`.partition()` call.
+ // doesn't require additional synchronized as it comes after
collectAllMetrics, which
+ // is synchronized
+ final int localBatchSize = Math.min(batchSize, totalMetrics);
+
+ final Iterable<List<MetricData>> batches = Lists.partition(metricData,
localBatchSize);
+ final int totalBatches = (totalMetrics + localBatchSize - 1) /
localBatchSize;
+ final List<CompletableResultCode> results = new
ArrayList<>(totalBatches);
+ for (final List<MetricData> batch : batches) {
+ results.add(exportBatch(batch));
+ }
Review Comment:
Aren't we missing handling of stale metics?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]