Izeren commented on code in PR #28427:
URL: https://github.com/apache/flink/pull/28427#discussion_r3459152569


##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory> 
getRegisteredFileSystemFactories() {
         }
     }
 
+    /**
+     * Hands a runtime-owned, process-level {@link MetricGroup} to every 
registered {@link
+     * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+     *
+     * <p>This is the second phase of file system initialization. {@link 
#initialize(Configuration,
+     * PluginManager)} runs at process startup, before the {@code 
MetricRegistry} exists; this
+     * method is therefore invoked separately, once the registry and a 
process-level {@link
+     * MetricGroup} are available. It is called from the TaskManager and 
JobManager entrypoints
+     * only. Contexts without a process-level {@link MetricGroup} (CLI, 
HistoryServer, YARN client)
+     * simply never call it, and their file system plugins continue to operate 
without emitting
+     * metrics.
+     *
+     * <p>The call is idempotent: factories receive a child group {@code 
<process>.filesystem}, and
+     * {@link MetricGroup#addGroup} returns the same child on repeated calls 
with the same parent,
+     * so re-invocation does not register duplicate metrics. Factories that do 
not implement {@link

Review Comment:
   Is it going to spam logs for "duplicate metric groups" even more than now? 



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link 
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link 
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin

Review Comment:
   !nit, one more bloated explanation, which could be done directly on the test 
case itself, why do we want to put it in the class docs?



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link 
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link 
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory}, 
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap 
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no 
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+    @AfterEach
+    void resetFileSystems() {
+        // Restore the default, plugin-less factory registry so other tests 
are unaffected.
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Test
+    void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("metrics-test-fs");
+        initializeWithPlugins(factory);
+
+        RecordingMetricGroup processGroup = new RecordingMetricGroup();
+        FileSystem.attachMetrics(processGroup);
+
+        // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+        assertThat(factory.setMetricGroupCalls).hasValue(1);
+        // The group handed to the factory is the "filesystem" child of the 
process group, not the
+        // process group itself.
+        assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+        
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+    }
+
+    @Test
+    void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+        initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void attachMetricsIsResilientToAFactoryThatThrows() {
+        RecordingMetricsAwareFactory ok = new 
RecordingMetricsAwareFactory("ok-test-fs");
+        initializeWithPlugins(new 
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+        // A misbehaving plugin must never break process startup, and 
well-behaved factories must
+        // still receive their group regardless of iteration order.
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+        assertThat(ok.setMetricGroupCalls).hasValue(1);
+    }
+
+    @Test
+    void attachMetricsDoesNotThrowWhenInvokedRepeatedly() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("idem-test-fs");
+        initializeWithPlugins(factory);
+
+        MetricGroup group = new UnregisteredMetricsGroup();
+        assertThatCode(
+                        () -> {
+                            FileSystem.attachMetrics(group);
+                            FileSystem.attachMetrics(group);
+                        })
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void setMetricGroupIsNotInvokedWhenAttachMetricsIsNeverCalled() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("never-test-fs");
+        initializeWithPlugins(factory);
+
+        assertThat(factory.setMetricGroupCalls).hasValue(0);
+    }
+
+    @Test
+    void pluginFileSystemFactoryForwardsMetricGroupToInner() {
+        RecordingMetricsAwareFactory inner = new 
RecordingMetricsAwareFactory("wrapped-fs");
+        FileSystemFactory wrapper = PluginFileSystemFactory.of(inner);
+
+        // The wrapper must itself be MetricsAware so attachMetrics reaches it 
without unwrapping.
+        assertThat(wrapper).isInstanceOf(MetricsAware.class);
+
+        MetricGroup group = new UnregisteredMetricsGroup();
+        ((MetricsAware) wrapper).setMetricGroup(group);
+
+        assertThat(inner.setMetricGroupCalls).hasValue(1);
+        assertThat(inner.receivedGroup.get()).isSameAs(group);
+    }
+
+    private static void initializeWithPlugins(FileSystemFactory... factories) {
+        Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+        plugins.put(FileSystemFactory.class, 
Arrays.asList(factories).iterator());
+        FileSystem.initialize(new Configuration(), new 
TestingPluginManager(plugins));
+    }
+
+    // ------------------------------------------------------------------------
+    //  test factories
+    // ------------------------------------------------------------------------
+
+    private static class PlainFactory implements FileSystemFactory {

Review Comment:
   Could we deduplicate the code here? I feel like single 
TestingFileSystemFactory could do all things together and take inline hook for 
the behaviour override which would make it more clear in tests



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";

Review Comment:
   Are these constants expected to be universal across cloud providers? If so, 
should we extract them to the common module like the interface?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -321,9 +325,53 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "When not set, the default chain is 
used: delegation tokens -> "
                                     + "static credentials (if configured) -> 
DefaultCredentialsProvider.");
 
+    public static final ConfigOption<Boolean> METRICS_ENABLED =
+            ConfigOptions.key("s3.metrics.enabled")

Review Comment:
   Is the suggestion to have these properties replicated across all clouds? I 
am curious if it would be more generic to move them higher up and parametrise 
by <scheme> as we do for some other per-cloud configs



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";
+    static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+    static final String THROTTLE_COUNT = "throttle_count";
+    static final String RETRY_COUNT = "retry_count";
+    static final String IOPS = "iops";
+
+    /** The default-on metric set from FLIP-576. {@code iops} is derived, not 
registered. */
+    static final List<String> DEFAULT_ALLOWLIST =
+            Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, 
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+    private static final String WILDCARD = "*";
+
+    private static final String LABEL_OP = "op";
+    private static final String LABEL_STATUS_CLASS = "status_class";
+    private static final String LABEL_REASON = "reason";
+
+    private static final String UNKNOWN_OP = "Unknown";
+
+    private final MetricGroup fsScope;
+    private final int histogramWindowSize;
+
+    private final boolean allowAll;
+    private final Set<String> allowlist;
+
+    // op and label sets are closed, so these maps are bounded by construction.
+    private final ConcurrentHashMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+            new ConcurrentHashMap<>();
+
+    public AwsSdkMetricBridge(MetricGroup fsScope) {
+        this(fsScope, DEFAULT_ALLOWLIST, 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+    }
+
+    public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+        this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+    }
+
+    public AwsSdkMetricBridge(
+            MetricGroup fsScope, @Nullable Collection<String> allowlist, int 
histogramWindowSize) {
+        this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not 
be null");
+        Preconditions.checkArgument(
+                histogramWindowSize > 0, "histogramWindowSize must be 
positive");
+        this.histogramWindowSize = histogramWindowSize;
+
+        if (allowlist == null || allowlist.isEmpty()) {
+            LOG.warn(
+                    "S3 metrics allowlist is empty; falling back to the 
default metric set {}",
+                    DEFAULT_ALLOWLIST);
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+        } else if (allowlist.contains(WILDCARD)) {
+            this.allowAll = true;
+            this.allowlist = new HashSet<>();
+        } else {
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(allowlist);
+        }
+    }
+
+    private boolean allowed(String metricName) {
+        return allowAll || allowlist.contains(metricName);
+    }
+
+    @Override
+    public void publish(MetricCollection apiCall) {
+        try {
+            translate(apiCall);
+        } catch (Throwable t) {
+            // Defence in depth: a metric failure must never affect S3 IO.
+            LOG.debug("Failed to publish S3 SDK metrics", t);
+        }
+    }
+
+    private void translate(MetricCollection apiCall) {
+        final String op = first(apiCall, CoreMetric.OPERATION_NAME, 
UNKNOWN_OP);
+
+        final Duration duration = first(apiCall, CoreMetric.API_CALL_DURATION, 
null);
+        if (duration != null && allowed(API_CALL_DURATION_MS)) {
+            histogram(op).update(duration.toMillis());
+        }
+
+        // HTTP_STATUS_CODE lives on the per-attempt children, not on the 
top-level ApiCall record.
+        // status_class reflects the overall outcome (last attempt); the retry 
reason reflects the
+        // failures that triggered the retries (any attempt), so they are 
tracked separately.
+        int throttleResponses = 0;
+        boolean sawServerError = false;
+        Integer lastStatus = null;
+        for (MetricCollection attempt : apiCall.children()) {
+            for (Integer status : 
attempt.metricValues(HttpMetric.HTTP_STATUS_CODE)) {
+                if (status != null) {
+                    lastStatus = status;
+                    if (isThrottle(status)) {
+                        throttleResponses++;
+                    } else if (status >= 500) {
+                        sawServerError = true;
+                    }
+                }
+            }
+        }
+
+        final Boolean successful = first(apiCall, 
CoreMetric.API_CALL_SUCCESSFUL, null);
+        if (allowed(API_CALL_COUNT)) {
+            apiCallCount(op, statusClass(lastStatus, successful)).inc();
+        }
+
+        if (throttleResponses > 0 && allowed(THROTTLE_COUNT)) {
+            throttleCount(op).inc(throttleResponses);
+        }
+
+        final Integer retries = first(apiCall, CoreMetric.RETRY_COUNT, 0);
+        if (retries != null && retries > 0 && allowed(RETRY_COUNT)) {
+            retryCount(op, retryReason(throttleResponses > 0, 
sawServerError)).inc(retries);
+        }
+    }
+
+    private static boolean isThrottle(int status) {
+        return status == 429 || status == 503;
+    }
+
+    private static String statusClass(Integer status, Boolean successful) {
+        if (status == null) {
+            if (Boolean.TRUE.equals(successful)) {
+                return "2xx";
+            }
+            return Boolean.FALSE.equals(successful) ? "error" : "unknown";
+        }
+        if (isThrottle(status)) {
+            return "throttled";
+        }
+        if (status >= 200 && status < 300) {
+            return "2xx";
+        }
+        if (status >= 400 && status < 500) {
+            return "4xx";
+        }
+        if (status >= 500) {
+            return "5xx";
+        }
+        return "other";
+    }
+
+    private static String retryReason(boolean sawThrottle, boolean 
sawServerError) {

Review Comment:
   Should/can we extract this as cloud agnostic?
   



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";
+    static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+    static final String THROTTLE_COUNT = "throttle_count";
+    static final String RETRY_COUNT = "retry_count";
+    static final String IOPS = "iops";
+
+    /** The default-on metric set from FLIP-576. {@code iops} is derived, not 
registered. */
+    static final List<String> DEFAULT_ALLOWLIST =
+            Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, 
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+    private static final String WILDCARD = "*";
+
+    private static final String LABEL_OP = "op";
+    private static final String LABEL_STATUS_CLASS = "status_class";
+    private static final String LABEL_REASON = "reason";
+
+    private static final String UNKNOWN_OP = "Unknown";
+
+    private final MetricGroup fsScope;
+    private final int histogramWindowSize;
+
+    private final boolean allowAll;
+    private final Set<String> allowlist;
+
+    // op and label sets are closed, so these maps are bounded by construction.
+    private final ConcurrentHashMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+            new ConcurrentHashMap<>();
+
+    public AwsSdkMetricBridge(MetricGroup fsScope) {
+        this(fsScope, DEFAULT_ALLOWLIST, 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+    }
+
+    public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+        this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+    }
+
+    public AwsSdkMetricBridge(
+            MetricGroup fsScope, @Nullable Collection<String> allowlist, int 
histogramWindowSize) {
+        this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not 
be null");
+        Preconditions.checkArgument(
+                histogramWindowSize > 0, "histogramWindowSize must be 
positive");
+        this.histogramWindowSize = histogramWindowSize;
+
+        if (allowlist == null || allowlist.isEmpty()) {
+            LOG.warn(
+                    "S3 metrics allowlist is empty; falling back to the 
default metric set {}",
+                    DEFAULT_ALLOWLIST);
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+        } else if (allowlist.contains(WILDCARD)) {
+            this.allowAll = true;
+            this.allowlist = new HashSet<>();
+        } else {
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(allowlist);
+        }
+    }
+
+    private boolean allowed(String metricName) {
+        return allowAll || allowlist.contains(metricName);
+    }
+
+    @Override
+    public void publish(MetricCollection apiCall) {
+        try {
+            translate(apiCall);
+        } catch (Throwable t) {

Review Comment:
   Do we really want to catch throwable here?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";
+    static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+    static final String THROTTLE_COUNT = "throttle_count";
+    static final String RETRY_COUNT = "retry_count";
+    static final String IOPS = "iops";
+
+    /** The default-on metric set from FLIP-576. {@code iops} is derived, not 
registered. */
+    static final List<String> DEFAULT_ALLOWLIST =
+            Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, 
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+    private static final String WILDCARD = "*";
+
+    private static final String LABEL_OP = "op";
+    private static final String LABEL_STATUS_CLASS = "status_class";
+    private static final String LABEL_REASON = "reason";
+
+    private static final String UNKNOWN_OP = "Unknown";
+
+    private final MetricGroup fsScope;
+    private final int histogramWindowSize;
+
+    private final boolean allowAll;
+    private final Set<String> allowlist;
+
+    // op and label sets are closed, so these maps are bounded by construction.
+    private final ConcurrentHashMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+            new ConcurrentHashMap<>();
+
+    public AwsSdkMetricBridge(MetricGroup fsScope) {
+        this(fsScope, DEFAULT_ALLOWLIST, 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+    }
+
+    public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+        this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+    }
+
+    public AwsSdkMetricBridge(
+            MetricGroup fsScope, @Nullable Collection<String> allowlist, int 
histogramWindowSize) {
+        this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not 
be null");
+        Preconditions.checkArgument(
+                histogramWindowSize > 0, "histogramWindowSize must be 
positive");
+        this.histogramWindowSize = histogramWindowSize;
+
+        if (allowlist == null || allowlist.isEmpty()) {

Review Comment:
   I feel like allowlist could be implemented at the config options level with 
the `default` resolved on config read. Then you don't have to handle fallbacks 
in the code. 
   
   Also, a similar question as before, do we want to scope this down to the AWS 
sdk bridge or are these configs universal enough to be extracted?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records 
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+    @Test
+    void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        Histogram histogram = 
root.histograms.get("op=PutObject/api_call_duration_ms");
+        assertThat(histogram).isNotNull();
+        assertThat(histogram.getCount()).isEqualTo(1L);
+        assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+        
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+    }
+
+    @Test
+    void throttledCallIncrementsThrottleAndRetryCounts() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        // Two throttled attempts (503) followed by a successful one (200); 
RETRY_COUNT = 2.
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+        // The final attempt succeeded, so the overall call is classified 2xx.
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+    }
+
+    @Test
+    void clientErrorIsClassifiedAs4xx() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0, 
404));
+
+        
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+    }
+
+    @Test
+    void allowlistRegistersOnlyTheListedMetrics() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        // Only api_call_count is allowed; duration, throttle and retry must 
be skipped.
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+    }
+
+    @Test
+    void wildcardAllowlistRegistersEveryMetric() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        Collections.singletonList("*"),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+    }
+
+    @Test
+    void emptyAllowlistFallsBackToDefaults() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root, Collections.emptyList(), 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        // The five default metrics include api_call_count and 
api_call_duration_ms.
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms.get("op=PutObject/api_call_duration_ms")).isNotNull();
+    }
+
+    @Test
+    void serverErrorRetryIsClassifiedAs5xx() {

Review Comment:
   I would be very explicit that this is AWS specific behaviour, ideally I 
would abstract error classification and do it somewhat outside of the bridge. 
Bridge should only map exact response to abstract error class that can be 
re-used for any cloud. Throttling is not AWS specific



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code 
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become 
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with 
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the 
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after 
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram} 
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real 
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request 
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply 
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+    private static final String MINIO_IMAGE = 
"minio/minio:RELEASE.2022-02-07T08-17-33Z";

Review Comment:
   Why do we hardcode this image here? Do we have more centralised way to 
manage minio tests?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -377,6 +377,33 @@ protected void initializeServices(Configuration 
configuration, PluginManager plu
             configuration.set(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
             configuration.set(JobManagerOptions.PORT, 
commonRpcService.getPort());
 
+            metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
+
+            final RpcService metricQueryServiceRpcService =
+                    MetricUtils.startRemoteMetricsRpcService(
+                            configuration,
+                            commonRpcService.getAddress(),
+                            configuration.get(JobManagerOptions.BIND_HOST),
+                            rpcSystem);
+            metricRegistry.startQueryService(metricQueryServiceRpcService, 
null);
+
+            final String hostname = RpcUtils.getHostname(commonRpcService);
+
+            processMetricGroup =
+                    MetricUtils.instantiateProcessMetricGroup(
+                            metricRegistry,
+                            hostname,
+                            
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
+                                    configuration));
+
+            // Second-phase init for file system plugins that opt into metrics 
(e.g.
+            // flink-s3-fs-native): hand them the process-level metric group 
before any file system
+            // is used. This must run ahead of the HA services and BlobServer 
below, because those
+            // may open external file systems (e.g. S3 HA/blob storage), 
creating them first would
+            // cache metric-less file system clients for the rest of the 
process lifetime. See

Review Comment:
   Is the problem here that FS doesn't have it's own `close` hook? If so, I 
think it is FS bug that we should fix for many reasons to allow all FS related 
resources be freed as part of FS shutdown. Also, FS lifecycle is typically the 
same as the whole TM/JM VM lifecycle. Could you please explain what are we 
trying to achieve?



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link 
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link 
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory}, 
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap 
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no 
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+    @AfterEach
+    void resetFileSystems() {
+        // Restore the default, plugin-less factory registry so other tests 
are unaffected.
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Test
+    void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("metrics-test-fs");
+        initializeWithPlugins(factory);
+
+        RecordingMetricGroup processGroup = new RecordingMetricGroup();
+        FileSystem.attachMetrics(processGroup);
+
+        // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+        assertThat(factory.setMetricGroupCalls).hasValue(1);
+        // The group handed to the factory is the "filesystem" child of the 
process group, not the
+        // process group itself.
+        assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+        
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+    }
+
+    @Test
+    void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+        initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void attachMetricsIsResilientToAFactoryThatThrows() {
+        RecordingMetricsAwareFactory ok = new 
RecordingMetricsAwareFactory("ok-test-fs");
+        initializeWithPlugins(new 
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+        // A misbehaving plugin must never break process startup, and 
well-behaved factories must
+        // still receive their group regardless of iteration order.
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+        assertThat(ok.setMetricGroupCalls).hasValue(1);

Review Comment:
   Do we have a test that would show `2` for multiple successful attachments?



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory> 
getRegisteredFileSystemFactories() {
         }
     }
 
+    /**
+     * Hands a runtime-owned, process-level {@link MetricGroup} to every 
registered {@link
+     * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+     *
+     * <p>This is the second phase of file system initialization. {@link 
#initialize(Configuration,
+     * PluginManager)} runs at process startup, before the {@code 
MetricRegistry} exists; this
+     * method is therefore invoked separately, once the registry and a 
process-level {@link
+     * MetricGroup} are available. It is called from the TaskManager and 
JobManager entrypoints
+     * only. Contexts without a process-level {@link MetricGroup} (CLI, 
HistoryServer, YARN client)
+     * simply never call it, and their file system plugins continue to operate 
without emitting
+     * metrics.
+     *
+     * <p>The call is idempotent: factories receive a child group {@code 
<process>.filesystem}, and
+     * {@link MetricGroup#addGroup} returns the same child on repeated calls 
with the same parent,
+     * so re-invocation does not register duplicate metrics. Factories that do 
not implement {@link
+     * MetricsAware} are skipped.
+     *
+     * @param processMetricGroup the process-level metric group to register 
file system metrics
+     *     under.
+     */
+    @Internal
+    public static void attachMetrics(MetricGroup processMetricGroup) {
+        checkNotNull(processMetricGroup, "processMetricGroup");
+        LOCK.lock();
+        try {
+            final MetricGroup fsGroup = 
processMetricGroup.addGroup("filesystem");
+            for (FileSystemFactory factory : FS_FACTORIES.values()) {
+                // Plugin-loaded factories are wrapped in a 
PluginFileSystemFactory, which is itself
+                // MetricsAware and forwards setMetricGroup to the inner 
factory under the plugin
+                // classloader, so this plain instanceof reaches both wrapped 
and direct factories.
+                if (factory instanceof MetricsAware) {
+                    try {
+                        ((MetricsAware) factory).setMetricGroup(fsGroup);
+                    } catch (Throwable t) {

Review Comment:
   Do we really want to catch everything including OOMs here? That seems 
unnecessarily broad



##########
flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Capability marker for {@link Plugin}s that want to register Flink metrics.
+ *
+ * <p>This is an opt-in extension to the plugin SPI. A plugin declares {@code 
implements
+ * SomePluginSpi, MetricsAware}; plugins that do not implement it are 
byte-for-byte unchanged and
+ * emit no metrics.
+ *
+ * <p><b>Two-phase init contract.</b> The runtime invokes {@link 
#setMetricGroup(MetricGroup)} after
+ * {@link Plugin#configure(org.apache.flink.configuration.Configuration)} and 
before any operation
+ * that would emit a metric. The call happens only from runtime entrypoints 
that own a process-level
+ * {@link MetricGroup} (TaskManager and JobManager), via {@link
+ * org.apache.flink.core.fs.FileSystem#attachMetrics(MetricGroup)}. Contexts 
without such a group
+ * (CLI, HistoryServer, YARN client, embedded usage) never call it, in which 
case the plugin must
+ * continue to operate normally and emit no metrics.
+ *
+ * <p><b>Idempotency.</b> {@code setMetricGroup} may be invoked more than once 
(for example if
+ * metrics are re-attached). Implementations must treat repeated invocations 
idempotently: a call
+ * with the same {@link MetricGroup} must not register duplicate metrics, and 
a call with a

Review Comment:
   What deduplicates metric groups for different plugins if they all register 
it under `filesystem` name?



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory> 
getRegisteredFileSystemFactories() {
         }
     }
 
+    /**
+     * Hands a runtime-owned, process-level {@link MetricGroup} to every 
registered {@link
+     * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+     *
+     * <p>This is the second phase of file system initialization. {@link 
#initialize(Configuration,
+     * PluginManager)} runs at process startup, before the {@code 
MetricRegistry} exists; this
+     * method is therefore invoked separately, once the registry and a 
process-level {@link
+     * MetricGroup} are available. It is called from the TaskManager and 
JobManager entrypoints
+     * only. Contexts without a process-level {@link MetricGroup} (CLI, 
HistoryServer, YARN client)
+     * simply never call it, and their file system plugins continue to operate 
without emitting
+     * metrics.
+     *
+     * <p>The call is idempotent: factories receive a child group {@code 
<process>.filesystem}, and
+     * {@link MetricGroup#addGroup} returns the same child on repeated calls 
with the same parent,
+     * so re-invocation does not register duplicate metrics. Factories that do 
not implement {@link
+     * MetricsAware} are skipped.
+     *
+     * @param processMetricGroup the process-level metric group to register 
file system metrics
+     *     under.
+     */
+    @Internal
+    public static void attachMetrics(MetricGroup processMetricGroup) {
+        checkNotNull(processMetricGroup, "processMetricGroup");
+        LOCK.lock();
+        try {
+            final MetricGroup fsGroup = 
processMetricGroup.addGroup("filesystem");

Review Comment:
   Hypothetically, can it result in group clash if we have different FS 
instances with telemetry enabled at the same time? 



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link 
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link 
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory}, 
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap 
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no 
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+    @AfterEach
+    void resetFileSystems() {
+        // Restore the default, plugin-less factory registry so other tests 
are unaffected.
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Test
+    void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("metrics-test-fs");
+        initializeWithPlugins(factory);
+
+        RecordingMetricGroup processGroup = new RecordingMetricGroup();
+        FileSystem.attachMetrics(processGroup);
+
+        // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+        assertThat(factory.setMetricGroupCalls).hasValue(1);
+        // The group handed to the factory is the "filesystem" child of the 
process group, not the
+        // process group itself.
+        assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+        
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+    }
+
+    @Test
+    void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {

Review Comment:
   This is misleading as we are talking about `factories` while testing only 
plain factory. It should either be parametrised or have more explicit naming + 
explanation why testing plain only is enough



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.MetricsAware;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/**
+ * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link 
MetricsAware} two-phase
+ * init contract.
+ *
+ * <p>The headline case is {@link 
#attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
+ * file systems are registered wrapped in a {@link PluginFileSystemFactory}, 
which does <em>not</em>
+ * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap 
the proxy to reach the
+ * real factory, otherwise the metric group is silently never delivered and no 
metrics are ever
+ * emitted.
+ */
+class FileSystemAttachMetricsTest {
+
+    @AfterEach
+    void resetFileSystems() {
+        // Restore the default, plugin-less factory registry so other tests 
are unaffected.
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @Test
+    void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("metrics-test-fs");
+        initializeWithPlugins(factory);
+
+        RecordingMetricGroup processGroup = new RecordingMetricGroup();
+        FileSystem.attachMetrics(processGroup);
+
+        // Unwrapped through PluginFileSystemFactory and invoked exactly once.
+        assertThat(factory.setMetricGroupCalls).hasValue(1);
+        // The group handed to the factory is the "filesystem" child of the 
process group, not the
+        // process group itself.
+        assertThat(processGroup.childGroupNames).containsExactly("filesystem");
+        
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
+    }
+
+    @Test
+    void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
+        initializeWithPlugins(new PlainFactory("plain-test-fs"));
+
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void attachMetricsIsResilientToAFactoryThatThrows() {
+        RecordingMetricsAwareFactory ok = new 
RecordingMetricsAwareFactory("ok-test-fs");
+        initializeWithPlugins(new 
ThrowingMetricsAwareFactory("throwing-test-fs"), ok);
+
+        // A misbehaving plugin must never break process startup, and 
well-behaved factories must
+        // still receive their group regardless of iteration order.
+        assertThatCode(() -> FileSystem.attachMetrics(new 
UnregisteredMetricsGroup()))
+                .doesNotThrowAnyException();
+        assertThat(ok.setMetricGroupCalls).hasValue(1);
+    }
+
+    @Test
+    void attachMetricsDoesNotThrowWhenInvokedRepeatedly() {
+        RecordingMetricsAwareFactory factory = new 
RecordingMetricsAwareFactory("idem-test-fs");
+        initializeWithPlugins(factory);
+
+        MetricGroup group = new UnregisteredMetricsGroup();
+        assertThatCode(
+                        () -> {
+                            FileSystem.attachMetrics(group);
+                            FileSystem.attachMetrics(group);
+                        })
+                .doesNotThrowAnyException();

Review Comment:
   That only verifies that repeat request doesn't throw, but doesn't verify 
idempotency



##########
flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * Capability marker for {@link Plugin}s that want to register Flink metrics.
+ *
+ * <p>This is an opt-in extension to the plugin SPI. A plugin declares {@code 
implements
+ * SomePluginSpi, MetricsAware}; plugins that do not implement it are 
byte-for-byte unchanged and

Review Comment:
   !nit. This doc is a bit bloated, I don't think we need to overexplain how to 
use interfaces with (I am especially triggered by not "load bearing" 
`byte-for-byte unchanged` clarifications) 



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.util.Arrays;
+
+/**
+ * Minimal sliding-window {@link Histogram} used by {@link AwsSdkMetricBridge} 
for {@code
+ * api_call_duration_ms}.
+ *
+ * <p>Backed by a fixed-size circular buffer holding the most recent {@code 
windowSize} samples
+ * (default {@value #DEFAULT_WINDOW_SIZE}). This bounds memory regardless of 
request volume and
+ * keeps {@link #update(long)} O(1); statistics are computed from a sorted 
snapshot of the window.
+ *
+ * <p>flink-s3-fs-native deliberately keeps a minimal dependency footprint and 
does not depend on
+ * flink-runtime, so {@code DescriptiveStatisticsHistogram} is not available; 
this is a small
+ * self-contained equivalent.
+ */
+@Internal
+public class S3MetricHistogram implements Histogram {

Review Comment:
   Does this class have anything S3 related? It looks like a good candidate to 
be extracted as cloud agnostic code



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records 
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+    @Test
+    void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        Histogram histogram = 
root.histograms.get("op=PutObject/api_call_duration_ms");
+        assertThat(histogram).isNotNull();
+        assertThat(histogram.getCount()).isEqualTo(1L);
+        assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+        
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+    }
+
+    @Test
+    void throttledCallIncrementsThrottleAndRetryCounts() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        // Two throttled attempts (503) followed by a successful one (200); 
RETRY_COUNT = 2.
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+        // The final attempt succeeded, so the overall call is classified 2xx.
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+    }
+
+    @Test
+    void clientErrorIsClassifiedAs4xx() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0, 
404));
+
+        
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+    }
+
+    @Test
+    void allowlistRegistersOnlyTheListedMetrics() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        // Only api_call_count is allowed; duration, throttle and retry must 
be skipped.
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+    }
+
+    @Test
+    void wildcardAllowlistRegistersEveryMetric() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        Collections.singletonList("*"),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+    }
+
+    @Test
+    void emptyAllowlistFallsBackToDefaults() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root, Collections.emptyList(), 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        // The five default metrics include api_call_count and 
api_call_duration_ms.
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms.get("op=PutObject/api_call_duration_ms")).isNotNull();
+    }
+
+    @Test
+    void serverErrorRetryIsClassifiedAs5xx() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("GetObject", Duration.ofMillis(50), true, 1, 
500, 200));
+
+        
assertThat(root.count("op=GetObject/reason=5xx/retry_count")).isEqualTo(1L);
+        
assertThat(root.counters).doesNotContainKey("op=GetObject/throttle_count");
+    }
+
+    @Test
+    void accumulatesAcrossMultipleCalls() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("GetObject", Duration.ofMillis(10), true, 0, 
200));
+        bridge.publish(apiCall("GetObject", Duration.ofMillis(30), true, 0, 
200));
+
+        
assertThat(root.count("op=GetObject/status_class=2xx/api_call_count")).isEqualTo(2L);
+        Histogram histogram = 
root.histograms.get("op=GetObject/api_call_duration_ms");
+        assertThat(histogram.getCount()).isEqualTo(2L);
+        assertThat(histogram.getStatistics().getMin()).isEqualTo(10L);
+        assertThat(histogram.getStatistics().getMax()).isEqualTo(30L);
+    }
+
+    @Test
+    void publishOfEmptyCollectionDoesNotThrowAndUsesUnknownOp() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(MetricCollector.create("ApiCall").collect());
+
+        assertThat(root.counters.keySet()).anyMatch(key -> 
key.contains("op=Unknown"));
+    }
+
+    private static MetricCollection apiCall(
+            String op, Duration duration, boolean successful, int retries, 
int... attemptStatuses) {
+        MetricCollector apiCall = MetricCollector.create("ApiCall");
+        apiCall.reportMetric(CoreMetric.OPERATION_NAME, op);
+        apiCall.reportMetric(CoreMetric.API_CALL_DURATION, duration);
+        apiCall.reportMetric(CoreMetric.API_CALL_SUCCESSFUL, successful);
+        apiCall.reportMetric(CoreMetric.RETRY_COUNT, retries);
+        for (int status : attemptStatuses) {
+            MetricCollector attempt = apiCall.createChild("ApiCallAttempt");
+            attempt.reportMetric(HttpMetric.HTTP_STATUS_CODE, status);
+        }
+        return apiCall.collect();
+    }
+
+    /** A {@link MetricGroup} that captures registered metrics keyed by their 
full label path. */
+    private static final class CapturingMetricGroup extends 
UnregisteredMetricsGroup {

Review Comment:
   Feels like we have similar duplicated code in other places. Should this be 
extracted as a lightweight dependency module for metric testing utils? 
@rkhachatryan, do you know what is common pattern in Flink codebase for 
repetitive testing mocks?



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -316,6 +318,53 @@ public static List<FileSystemFactory> 
getRegisteredFileSystemFactories() {
         }
     }
 
+    /**
+     * Hands a runtime-owned, process-level {@link MetricGroup} to every 
registered {@link
+     * FileSystemFactory} that opts into metrics via {@link MetricsAware}.
+     *
+     * <p>This is the second phase of file system initialization. {@link 
#initialize(Configuration,
+     * PluginManager)} runs at process startup, before the {@code 
MetricRegistry} exists; this
+     * method is therefore invoked separately, once the registry and a 
process-level {@link
+     * MetricGroup} are available. It is called from the TaskManager and 
JobManager entrypoints
+     * only. Contexts without a process-level {@link MetricGroup} (CLI, 
HistoryServer, YARN client)
+     * simply never call it, and their file system plugins continue to operate 
without emitting
+     * metrics.
+     *
+     * <p>The call is idempotent: factories receive a child group {@code 
<process>.filesystem}, and
+     * {@link MetricGroup#addGroup} returns the same child on repeated calls 
with the same parent,
+     * so re-invocation does not register duplicate metrics. Factories that do 
not implement {@link
+     * MetricsAware} are skipped.
+     *
+     * @param processMetricGroup the process-level metric group to register 
file system metrics
+     *     under.
+     */
+    @Internal
+    public static void attachMetrics(MetricGroup processMetricGroup) {
+        checkNotNull(processMetricGroup, "processMetricGroup");
+        LOCK.lock();
+        try {
+            final MetricGroup fsGroup = 
processMetricGroup.addGroup("filesystem");
+            for (FileSystemFactory factory : FS_FACTORIES.values()) {
+                // Plugin-loaded factories are wrapped in a 
PluginFileSystemFactory, which is itself
+                // MetricsAware and forwards setMetricGroup to the inner 
factory under the plugin
+                // classloader, so this plain instanceof reaches both wrapped 
and direct factories.
+                if (factory instanceof MetricsAware) {

Review Comment:
   Would this work for wrapped filesystems (e.g. for 
`ConnectionLimitingFileSystemFactory`)? Will all wrappers have to implement 
this interface too?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";
+    static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+    static final String THROTTLE_COUNT = "throttle_count";
+    static final String RETRY_COUNT = "retry_count";
+    static final String IOPS = "iops";
+
+    /** The default-on metric set from FLIP-576. {@code iops} is derived, not 
registered. */
+    static final List<String> DEFAULT_ALLOWLIST =
+            Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, 
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+    private static final String WILDCARD = "*";
+
+    private static final String LABEL_OP = "op";
+    private static final String LABEL_STATUS_CLASS = "status_class";
+    private static final String LABEL_REASON = "reason";
+
+    private static final String UNKNOWN_OP = "Unknown";
+
+    private final MetricGroup fsScope;
+    private final int histogramWindowSize;
+
+    private final boolean allowAll;
+    private final Set<String> allowlist;
+
+    // op and label sets are closed, so these maps are bounded by construction.
+    private final ConcurrentHashMap<String, Counter> counters = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, S3MetricHistogram> histograms =
+            new ConcurrentHashMap<>();
+
+    public AwsSdkMetricBridge(MetricGroup fsScope) {
+        this(fsScope, DEFAULT_ALLOWLIST, 
S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+    }
+
+    public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) {
+        this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize);
+    }
+
+    public AwsSdkMetricBridge(
+            MetricGroup fsScope, @Nullable Collection<String> allowlist, int 
histogramWindowSize) {
+        this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not 
be null");
+        Preconditions.checkArgument(
+                histogramWindowSize > 0, "histogramWindowSize must be 
positive");
+        this.histogramWindowSize = histogramWindowSize;
+
+        if (allowlist == null || allowlist.isEmpty()) {
+            LOG.warn(
+                    "S3 metrics allowlist is empty; falling back to the 
default metric set {}",
+                    DEFAULT_ALLOWLIST);
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST);
+        } else if (allowlist.contains(WILDCARD)) {
+            this.allowAll = true;
+            this.allowlist = new HashSet<>();
+        } else {
+            this.allowAll = false;
+            this.allowlist = new HashSet<>(allowlist);
+        }
+    }
+
+    private boolean allowed(String metricName) {
+        return allowAll || allowlist.contains(metricName);
+    }
+
+    @Override
+    public void publish(MetricCollection apiCall) {
+        try {
+            translate(apiCall);
+        } catch (Throwable t) {
+            // Defence in depth: a metric failure must never affect S3 IO.
+            LOG.debug("Failed to publish S3 SDK metrics", t);
+        }
+    }
+
+    private void translate(MetricCollection apiCall) {
+        final String op = first(apiCall, CoreMetric.OPERATION_NAME, 
UNKNOWN_OP);
+
+        final Duration duration = first(apiCall, CoreMetric.API_CALL_DURATION, 
null);
+        if (duration != null && allowed(API_CALL_DURATION_MS)) {
+            histogram(op).update(duration.toMillis());
+        }
+
+        // HTTP_STATUS_CODE lives on the per-attempt children, not on the 
top-level ApiCall record.
+        // status_class reflects the overall outcome (last attempt); the retry 
reason reflects the
+        // failures that triggered the retries (any attempt), so they are 
tracked separately.
+        int throttleResponses = 0;
+        boolean sawServerError = false;
+        Integer lastStatus = null;
+        for (MetricCollection attempt : apiCall.children()) {
+            for (Integer status : 
attempt.metricValues(HttpMetric.HTTP_STATUS_CODE)) {
+                if (status != null) {
+                    lastStatus = status;
+                    if (isThrottle(status)) {
+                        throttleResponses++;
+                    } else if (status >= 500) {
+                        sawServerError = true;
+                    }
+                }
+            }
+        }
+
+        final Boolean successful = first(apiCall, 
CoreMetric.API_CALL_SUCCESSFUL, null);
+        if (allowed(API_CALL_COUNT)) {
+            apiCallCount(op, statusClass(lastStatus, successful)).inc();
+        }
+
+        if (throttleResponses > 0 && allowed(THROTTLE_COUNT)) {
+            throttleCount(op).inc(throttleResponses);
+        }
+
+        final Integer retries = first(apiCall, CoreMetric.RETRY_COUNT, 0);
+        if (retries != null && retries > 0 && allowed(RETRY_COUNT)) {
+            retryCount(op, retryReason(throttleResponses > 0, 
sawServerError)).inc(retries);
+        }
+    }
+
+    private static boolean isThrottle(int status) {
+        return status == 429 || status == 503;
+    }
+
+    private static String statusClass(Integer status, Boolean successful) {

Review Comment:
   Should/can we extract this as cloud agnostic?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records 
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+    @Test
+    void successfulCallIncrementsApiCallCountAndRecordsDuration() {

Review Comment:
    Can we parametrise tests? For example tests for different status_class are 
very repetitive



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records 
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+    @Test
+    void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        Histogram histogram = 
root.histograms.get("op=PutObject/api_call_duration_ms");
+        assertThat(histogram).isNotNull();
+        assertThat(histogram.getCount()).isEqualTo(1L);
+        assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+        
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+    }
+
+    @Test
+    void throttledCallIncrementsThrottleAndRetryCounts() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        // Two throttled attempts (503) followed by a successful one (200); 
RETRY_COUNT = 2.
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+        // The final attempt succeeded, so the overall call is classified 2xx.
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+    }
+
+    @Test
+    void clientErrorIsClassifiedAs4xx() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0, 
404));
+
+        
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+    }
+
+    @Test
+    void allowlistRegistersOnlyTheListedMetrics() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        // Only api_call_count is allowed; duration, throttle and retry must 
be skipped.
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+    }
+
+    @Test
+    void wildcardAllowlistRegistersEveryMetric() {

Review Comment:
   I think this test can't tell the difference between * and fallback to 
default allowlist. Are we checking here metrics that are not in default 
allowlist?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records 
into Flink metrics. */
+class AwsSdkMetricBridgeTest {
+
+    @Test
+    void successfulCallIncrementsApiCallCountAndRecordsDuration() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 
200));
+
+        
assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L);
+        Histogram histogram = 
root.histograms.get("op=PutObject/api_call_duration_ms");
+        assertThat(histogram).isNotNull();
+        assertThat(histogram.getCount()).isEqualTo(1L);
+        assertThat(histogram.getStatistics().getMax()).isEqualTo(120L);
+        
assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count");
+    }
+
+    @Test
+    void throttledCallIncrementsThrottleAndRetryCounts() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        // Two throttled attempts (503) followed by a successful one (200); 
RETRY_COUNT = 2.
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+        // The final attempt succeeded, so the overall call is classified 2xx.
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+    }
+
+    @Test
+    void clientErrorIsClassifiedAs4xx() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root);
+
+        bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0, 
404));
+
+        
assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count");
+    }
+
+    @Test
+    void allowlistRegistersOnlyTheListedMetrics() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        // Only api_call_count is allowed; duration, throttle and retry must 
be skipped.
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        
Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count");
+        
assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count");
+    }
+
+    @Test
+    void wildcardAllowlistRegistersEveryMetric() {
+        CapturingMetricGroup root = new CapturingMetricGroup();
+        AwsSdkMetricBridge bridge =
+                new AwsSdkMetricBridge(
+                        root,
+                        Collections.singletonList("*"),
+                        S3MetricHistogram.DEFAULT_WINDOW_SIZE);
+
+        bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 
503, 503, 200));
+
+        
assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L);
+        
assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull();
+        assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L);
+        
assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L);
+    }
+
+    @Test
+    void emptyAllowlistFallsBackToDefaults() {

Review Comment:
   I think we shouldn't fallback to default. If user for some reason provided 
empty allowlist, this should throw error early. I don't see why someone would 
provide empty list unless they would like to disable all metrics (which should 
be done with master switch)



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3FileSystemFactoryMetricsTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.fs.s3native.NativeS3AFileSystemFactory;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that the native S3 factories register metrics under a {@code 
filesystem_type} label whose
+ * value is the factory scheme, so {@code s3://} and {@code s3a://} traffic 
remain distinguishable.
+ */
+class NativeS3FileSystemFactoryMetricsTest {
+
+    @Test
+    void s3FactoryUsesSchemeAsFilesystemTypeLabel() {

Review Comment:
   !nit, can be parametrised



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+import java.util.Arrays;
+
+/**
+ * Minimal sliding-window {@link Histogram} used by {@link AwsSdkMetricBridge} 
for {@code
+ * api_call_duration_ms}.
+ *
+ * <p>Backed by a fixed-size circular buffer holding the most recent {@code 
windowSize} samples
+ * (default {@value #DEFAULT_WINDOW_SIZE}). This bounds memory regardless of 
request volume and
+ * keeps {@link #update(long)} O(1); statistics are computed from a sorted 
snapshot of the window.
+ *
+ * <p>flink-s3-fs-native deliberately keeps a minimal dependency footprint and 
does not depend on
+ * flink-runtime, so {@code DescriptiveStatisticsHistogram} is not available; 
this is a small

Review Comment:
   > DescriptiveStatisticsHistogram
   
   I am not sure I am very comfortable with duplicating code chunks with 
non-trivial logic. If dependency footprint here is a real concern, it would 
make more sense to me to extract some lightweight parts of flink-runtime in a 
separate module. 
   @rkhachatryan, WDYT on this one?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values

Review Comment:
   > ~15 values
   
   That can easily get stale



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java:
##########
@@ -640,6 +640,11 @@ public static TaskExecutor startTaskManager(
                         resourceID,
                         
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+        // Second-phase init for file system plugins that opt into metrics 
(e.g.
+        // flink-s3-fs-native): hand them the process-level metric group now 
that the
+        // MetricRegistry exists. See FileSystem#attachMetrics and 
MetricsAware.
+        FileSystem.attachMetrics(taskManagerMetricGroup.f0);

Review Comment:
   +1



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -341,6 +389,46 @@ public void configure(Configuration config) {
         this.bucketConfigProvider = new BucketConfigProvider(config);
     }
 
+    @Override
+    public synchronized void setMetricGroup(MetricGroup metricGroup) {
+        // filesystem_type label value is the scheme ("s3" / "s3a"). This is 
deliberate: s3:// and
+        // s3a:// are served by separate factory instances, so keeping the 
scheme as the label value
+        // lets their traffic be told apart, and sibling FS plugins register 
the same label key with
+        // their own scheme. May be called more than once (see MetricsAware); 
reset the cached
+        // bridge
+        // so a re-attach with a different group re-scopes metrics created 
afterwards.
+        this.pluginMetrics = metricGroup.addGroup("filesystem_type", 
getScheme());

Review Comment:
   is `type` here replaced by scheme or just appended as "filesystem_type_s3"?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code 
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become 
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with 
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the 
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after 
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram} 
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real 
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request 
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply 
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+    private static final String MINIO_IMAGE = 
"minio/minio:RELEASE.2022-02-07T08-17-33Z";
+    private static final int MINIO_PORT = 9000;
+    private static final String ACCESS_KEY = "metricsAccessKey";
+    private static final String SECRET_KEY = "metricsSecretKey";
+    private static final String BUCKET = "flip576-metrics";
+
+    @Container
+    private static final GenericContainer<?> MINIO =
+            new GenericContainer<>(MINIO_IMAGE)
+                    .withEnv("MINIO_ROOT_USER", ACCESS_KEY)
+                    .withEnv("MINIO_ROOT_PASSWORD", SECRET_KEY)
+                    .withCommand("server", "/data")
+                    .withExposedPorts(MINIO_PORT)
+                    .waitingFor(
+                            Wait.forHttp("/minio/health/ready")
+                                    .forPort(MINIO_PORT)
+                                    
.withStartupTimeout(Duration.ofMinutes(2)));
+
+    private static String endpoint() {
+        return String.format("http://%s:%d";, MINIO.getHost(), 
MINIO.getMappedPort(MINIO_PORT));
+    }
+
+    @BeforeAll
+    static void createBucket() {
+        try (S3Client client =
+                S3Client.builder()
+                        .endpointOverride(URI.create(endpoint()))
+                        .region(Region.US_EAST_1)
+                        .credentialsProvider(
+                                StaticCredentialsProvider.create(
+                                        AwsBasicCredentials.create(ACCESS_KEY, 
SECRET_KEY)))
+                        .serviceConfiguration(
+                                
S3Configuration.builder().pathStyleAccessEnabled(true).build())
+                        .build()) {
+            
client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
+        }
+    }
+
+    @Test
+    void realS3OperationsEmitFlinkMetrics() throws Exception {
+        Configuration config = new Configuration();
+        config.set(NativeS3FileSystemFactory.ENDPOINT, endpoint());
+        config.set(NativeS3FileSystemFactory.ACCESS_KEY, ACCESS_KEY);
+        config.set(NativeS3FileSystemFactory.SECRET_KEY, SECRET_KEY);
+        config.set(NativeS3FileSystemFactory.REGION, "us-east-1");
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+        config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false);
+        config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, 
false);
+        config.set(NativeS3FileSystemFactory.METRICS_ENABLED, true);
+
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        factory.configure(config);
+
+        MetricListener metricListener = new MetricListener();
+        // Mirror what FileSystem#attachMetrics hands to the factory: the 
"filesystem" child of the
+        // process-level group.
+        MetricGroup fsGroup = 
metricListener.getMetricGroup().addGroup("filesystem");
+        factory.setMetricGroup(fsGroup);
+
+        FileSystem fs = factory.create(URI.create("s3://" + BUCKET + "/"));

Review Comment:
   Why only s3 and not s3a too? (maybe parametrised?)



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogramTest.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.metrics.HistogramStatistics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3MetricHistogram}. */
+class S3MetricHistogramTest {

Review Comment:
   If we were to use existing implementation for the histogram, we wouldn't 
need this test



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+import software.amazon.awssdk.metrics.SdkMetric;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code 
flink-s3-fs-native}.
+ *
+ * <p>The SDK invokes {@link #publish(MetricCollection)} asynchronously after 
every completed API
+ * call, on its internal completion executor. The bridge reads a small, fixed 
set of fields and
+ * emits the default metric surface of FLIP-576 against the {@code 
filesystem_type}-labelled scope
+ * it is handed at construction:
+ *
+ * <ul>
+ *   <li>{@code api_call_count} (Counter) — labels {@code op}, {@code 
status_class}
+ *   <li>{@code api_call_duration_ms} (Histogram) — label {@code op}
+ *   <li>{@code throttle_count} (Counter) — label {@code op}
+ *   <li>{@code retry_count} (Counter) — labels {@code op}, {@code reason}
+ * </ul>
+ *
+ * <p>({@code iops} from the default allowlist is derived at reporter time as 
the rate of {@code
+ * api_call_count}, so it is not a separately registered metric.)
+ *
+ * <p><b>Allowlist.</b> Only metrics whose name is in the allowlist passed at 
construction are
+ * registered; the rest are skipped on the hot path. A {@code "*"} entry 
registers everything. The
+ * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code 
api_call_duration_ms},
+ * {@code throttle_count}, {@code retry_count}, {@code iops}).
+ *
+ * <p><b>Cardinality.</b> {@code op} comes from the SDK operation name (a 
closed set of ~15 values
+ * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 
4xx}, {@code 5xx},
+ * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and 
{@code reason} is a closed
+ * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are 
cached in bounded maps,
+ * so {@link #publish} is a map lookup plus a counter increment with no 
per-record allocation.
+ *
+ * <p><b>Thread-safety.</b> Counters use {@link ThreadSafeSimpleCounter} and 
the histogram is
+ * synchronized, so concurrent publishes from the SDK completion executor are 
safe. The bridge is
+ * also safe to share across multiple S3 clients of the same plugin instance.
+ */
+@Internal
+public final class AwsSdkMetricBridge implements MetricPublisher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AwsSdkMetricBridge.class);
+
+    static final String API_CALL_COUNT = "api_call_count";
+    static final String API_CALL_DURATION_MS = "api_call_duration_ms";
+    static final String THROTTLE_COUNT = "throttle_count";
+    static final String RETRY_COUNT = "retry_count";
+    static final String IOPS = "iops";
+
+    /** The default-on metric set from FLIP-576. {@code iops} is derived, not 
registered. */
+    static final List<String> DEFAULT_ALLOWLIST =
+            Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, 
THROTTLE_COUNT, RETRY_COUNT, IOPS);
+
+    private static final String WILDCARD = "*";
+
+    private static final String LABEL_OP = "op";
+    private static final String LABEL_STATUS_CLASS = "status_class";
+    private static final String LABEL_REASON = "reason";
+
+    private static final String UNKNOWN_OP = "Unknown";
+
+    private final MetricGroup fsScope;
+    private final int histogramWindowSize;
+
+    private final boolean allowAll;
+    private final Set<String> allowlist;
+
+    // op and label sets are closed, so these maps are bounded by construction.
+    private final ConcurrentHashMap<String, Counter> counters = new 
ConcurrentHashMap<>();

Review Comment:
   !nit Map?



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.fs.s3native.NativeS3FileSystemFactory;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test proving that real S3 operations performed through {@code 
NativeS3FileSystem} are
+ * translated into Flink metrics by {@link AwsSdkMetricBridge} and become 
visible in a real Flink
+ * metric registry.
+ *
+ * <p>Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with 
synthesized SDK records
+ * and a fake {@code MetricGroup}), this test exercises the full chain: the 
AWS SDK actually invokes
+ * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after 
each completed API
+ * call, the bridge registers and updates {@link Counter}/{@link Histogram} 
handles, and the
+ * assertions read those handles back through {@link MetricListener}'s real 
{@code MetricRegistry}.
+ *
+ * <p>Assertions use only GET/HEAD/LIST round trips, which carry no request 
body and are therefore
+ * unaffected by the request-checksum behaviour newer AWS SDK versions apply 
to {@code PutObject}.
+ *
+ * <p>Requires Docker; auto-skipped when Docker is unavailable.
+ */
+@Testcontainers(disabledWithoutDocker = true)
+class NativeS3MetricsEmissionITCase {
+
+    private static final String MINIO_IMAGE = 
"minio/minio:RELEASE.2022-02-07T08-17-33Z";
+    private static final int MINIO_PORT = 9000;

Review Comment:
   I thought we discussed before that we should avoid MinIO tests because of 
licence change + it is in maintenance mode. Do we want to keep this dependency? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to