Samrat002 opened a new pull request, #28427:
URL: https://github.com/apache/flink/pull/28427

   ## What is the purpose of the change
   
   `flink-s3-fs-native` (the Hadoop-free, AWS SDK v2–based S3 filesystem) 
currently emits no metrics. When a job's checkpoints, savepoints, or sinks go 
through it, operators have no visibility into how Flink is actually talking to 
S3 request volume, latency, throttling, or retries, which makes diagnosing slow 
or failing checkpoints largely guesswork.
   
   This change makes the native S3 filesystem report **operation-level S3 
metrics** into Flink's metric system. It does so by bridging the AWS SDK's 
built-in metrics SPI into Flink `Counter`/`Histogram` instruments, so every 
completed S3 API call is counted, timed, and classified.
   
   To get there it also adds a small, generic piece of plumbing in 
`flink-core`/`flink-runtime`: filesystem plugins are classloader-isolated and 
are created before (and independently of) the metric registry, so there was no 
existing way to hand a `MetricGroup` to a filesystem. A new opt-in capability 
interface (`MetricsAware`) plus a `FileSystem.attachMetrics(...)` hook closes 
that gap, and is reusable by any future filesystem that wants to publish 
metrics.
   
   ## Brief change log
   
   **flink-core**
   - Add `MetricsAware` (`@PublicEvolving`, `org.apache.flink.core.plugin`): a 
`FileSystemFactory` implements it to be handed a `MetricGroup`.
   - Add `FileSystem.attachMetrics(MetricGroup)` (`@Internal`): creates a 
`filesystem` child group and forwards it to every registered `MetricsAware` 
factory; resilient to a misbehaving factory and idempotent.
   - `PluginFileSystemFactory` now implements `MetricsAware` and forwards 
`setMetricGroup` to the wrapped inner factory under the plugin classloader. 
Without this, plugin-loaded filesystems (the normal deployment mode) would 
silently never receive the group.
   
   **flink-runtime**
   - `ClusterEntrypoint` (JobManager) and `TaskManagerRunner` (TaskManager) 
call `FileSystem.attachMetrics(processMetricGroup)` during startup. The 
`ClusterEntrypoint` service-init order was adjusted so this runs before HA/blob 
services cache filesystem clients, otherwise those early clients would be 
created without a metric group.
   
   **flink-s3-fs-native**
   - `NativeS3FileSystemFactory` / `NativeS3AFileSystemFactory` implement 
`MetricsAware` and tag metrics with a `filesystem_type` label set to the scheme 
(`s3` vs `s3a`), so the two stay distinguishable.
   - `AwsSdkMetricBridge` implements 
`software.amazon.awssdk.metrics.MetricPublisher` and translates each 
`MetricCollection` into Flink metrics: `api_call_count` (labels: `op`, 
`status_class`), `api_call_duration_ms` (histogram, label `op`), 
`throttle_count` (label `op`), `retry_count` (labels: `op`, `reason`).
   - `S3MetricHistogram`: a bounded sliding-window histogram backing the 
duration metric.
   - `S3ClientProvider` registers the publisher on the sync/async clients.
   - New config options: `s3.metrics.enabled` (off by default), 
`s3.metrics.allowlist`, `s3.metrics.histogram.window-size`.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows.
   
   **Automated tests**
   - `AwsSdkMetricBridgeTest` — translation of SDK records to Flink metrics; 
`status_class` classification (2xx/4xx/5xx/throttled); retry attribution; 
allowlist behavior (explicit list, `*` wildcard, empty → defaults).
   - `S3MetricHistogramTest` — sliding-window statistics.
   - `NativeS3FileSystemFactoryMetricsTest` — the `filesystem_type` label 
resolves to `s3` / `s3a` per factory.
   - `FileSystemAttachMetricsTest` (flink-core) — `attachMetrics` unwraps 
`PluginFileSystemFactory` to reach the real factory, skips non-`MetricsAware` 
factories, survives a throwing factory, and is idempotent.
   - `NativeS3MetricsEmissionITCase` — MinIO via Testcontainers; real 
GET/HEAD/LIST round trips, asserting the counters/histograms are readable back 
through a real `MetricRegistry` (`MetricListener`). Auto-skips without Docker.
   
   **Manual end-to-end against real AWS S3**
   
   I ran a standalone cluster built from this branch with `s3.metrics.enabled: 
true` and the SLF4J reporter, and submitted a large-state streaming job 
checkpointing to `s3://<bucket>/checkpoints` (HashMap backend, filesystem 
checkpoint storage, 10 s interval). The native plugin loaded (`Plugin loader 
... s3-fs-native`), built its client via the SDK default credential chain, and 
wrote real checkpoint objects to S3. The reporter then showed the metrics on 
both the TaskManager (data-plane writes) and the JobManager (checkpoint 
coordination / multipart):
   
   ```
   # TaskManager
   
...filesystem.filesystem_type.s3.op.PutObject.status_class.2xx.api_call_count: 
31
   ...filesystem.filesystem_type.s3.op.PutObject.api_call_duration_ms: 
count=31, min=343, max=8329, mean=3383.2, p99=8329.0
   
...filesystem.filesystem_type.s3.op.ListObjectsV2.status_class.2xx.api_call_count:
 31
   ...filesystem.filesystem_type.s3.op.ListObjectsV2.reason.other.retry_count: 1
   
...filesystem.filesystem_type.s3.op.HeadObject.status_class.4xx.api_call_count: 
25
   
   # JobManager
   
...filesystem.filesystem_type.s3.op.CreateMultipartUpload.status_class.2xx.api_call_count:
 25
   
...filesystem.filesystem_type.s3.op.UploadPart.status_class.2xx.api_call_count: 
25
   
...filesystem.filesystem_type.s3.op.CompleteMultipartUpload.status_class.2xx.api_call_count:
 25
   
...filesystem.filesystem_type.s3.op.DeleteObject.status_class.2xx.api_call_count:
 48
   
...filesystem.filesystem_type.s3.op.HeadObject.status_class.2xx.api_call_count: 
48
   ```
   
   This confirms the full SDK metric surface (counters, duration histograms 
with percentiles, `status_class`/`reason` labels, multipart lifecycle) flows 
through to Flink's registry on a real job. Note these are registered at the 
JM/TM **process** scope (not per operator), since that is where the filesystem 
lives.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change the checkbox below to `[X]` followed by the name of the tool, and 
uncomment the
   "Generated-by" line. See the ASF Generative Tooling Guidance for details:
   https://www.apache.org/legal/generative-tooling.html
   
   You are responsible for the quality and correctness of every change in this 
PR
   regardless of the tooling used. Low-effort AI-generated PRs will be closed. 
See
   AGENTS.md for the full guidance.
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name and Version]
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to