This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d7cb799451 Flink: Backport removal of optional 
flink-metrics-dropwizard dependency to v2.0 and v1.20 (#16230)
d7cb799451 is described below

commit d7cb7994510c08d2b55352195773763d93243d9c
Author: Kevin Liu <[email protected]>
AuthorDate: Wed May 6 12:32:43 2026 -0400

    Flink: Backport removal of optional flink-metrics-dropwizard dependency to 
v2.0 and v1.20 (#16230)
---
 flink/v1.20/build.gradle                           |   3 -
 flink/v1.20/flink-runtime/LICENSE                  |  16 ---
 flink/v1.20/flink-runtime/runtime-deps.txt         |   2 -
 .../flink/sink/IcebergStreamWriterMetrics.java     | 109 ++++++++++++++++-----
 .../flink/sink/TestIcebergStreamWriterMetrics.java |  42 ++++++++
 flink/v2.0/build.gradle                            |   3 -
 flink/v2.0/flink-runtime/LICENSE                   |  16 ---
 flink/v2.0/flink-runtime/runtime-deps.txt          |   2 -
 .../flink/sink/IcebergStreamWriterMetrics.java     | 109 ++++++++++++++++-----
 .../flink/sink/TestIcebergStreamWriterMetrics.java |  42 ++++++++
 10 files changed, 254 insertions(+), 90 deletions(-)

diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index 2bbad1891c..c7ca24817b 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -169,9 +169,6 @@ 
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
-    // To support dropwizard histogram metrics (not shipped by Flink by 
default)
-    implementation libs.flink120.metrics.dropwizard
-
     // for integration testing with the flink-runtime-jar
     // all of those dependencies are required because the integration test 
extends FlinkTestBase
     integrationCompileOnly project(':iceberg-api')
diff --git a/flink/v1.20/flink-runtime/LICENSE 
b/flink/v1.20/flink-runtime/LICENSE
index 11460c3307..364652a5ac 100644
--- a/flink/v1.20/flink-runtime/LICENSE
+++ b/flink/v1.20/flink-runtime/LICENSE
@@ -460,22 +460,6 @@ License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2
 
 
--------------------------------------------------------------------------------
 
-This product bundles Dropwizard Metrics.
-
-Copyright: (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
-Project URL: https://github.com/dropwizard/metrics
-License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product bundles Apache Flink's optional support for Dropwizard Metrics.
-
-Copyright: 2014-2026 The Apache Software Foundation
-Project URL: https://flink.apache.org/
-License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
 This product bundles RoaringBitmap.
 
 Copyright: (c) 2013-... the RoaringBitmap authors
diff --git a/flink/v1.20/flink-runtime/runtime-deps.txt 
b/flink/v1.20/flink-runtime/runtime-deps.txt
index 7c7aed1e43..00c53ed388 100644
--- a/flink/v1.20/flink-runtime/runtime-deps.txt
+++ b/flink/v1.20/flink-runtime/runtime-deps.txt
@@ -6,11 +6,9 @@ com.github.luben:zstd-jni:1.5.7-3
 com.google.errorprone:error_prone_annotations:2.10.0
 dev.failsafe:failsafe:3.3.2
 io.airlift:aircompressor:2.0.3
-io.dropwizard.metrics:metrics-core:3.2.6
 org.apache.avro:avro:1.12.1
 org.apache.datasketches:datasketches-java:6.2.0
 org.apache.datasketches:datasketches-memory:3.0.2
-org.apache.flink:flink-metrics-dropwizard:1.20.1
 org.apache.httpcomponents.client5:httpclient5:5.6
 org.apache.httpcomponents.core5:httpcore5-h2:5.4
 org.apache.httpcomponents.core5:httpcore5:5.4
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
index 434f396957..6cf15ff713 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
@@ -18,23 +18,33 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import com.codahale.metrics.SlidingWindowReservoir;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.util.ScanTaskUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Internal
 public class IcebergStreamWriterMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriterMetrics.class);
+
   // 1,024 reservoir size should cost about 8KB, which is quite small.
   // It should also produce good accuracy for histogram distribution (like 
percentiles).
   private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
 
+  // Histogram metrics loaded through Flink's optional 
flink-metrics-dropwizard dependency.
+  // Will be null if not available.
+  private static final DropwizardCtors DROPWIZARD = loadDropwizardCtors();
+
   private final Counter flushedDataFiles;
   private final Counter flushedDeleteFiles;
   private final Counter flushedReferencedDataFiles;
@@ -51,18 +61,8 @@ public class IcebergStreamWriterMetrics {
     this.lastFlushDurationMs = new AtomicLong();
     writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get);
 
-    com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram =
-        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
-    this.dataFilesSizeHistogram =
-        writerMetrics.histogram(
-            "dataFilesSizeHistogram",
-            new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram));
-    com.codahale.metrics.Histogram dropwizardDeleteFilesSizeHistogram =
-        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
-    this.deleteFilesSizeHistogram =
-        writerMetrics.histogram(
-            "deleteFilesSizeHistogram",
-            new 
DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
+    this.dataFilesSizeHistogram = registerHistogram(writerMetrics, 
"dataFilesSizeHistogram");
+    this.deleteFilesSizeHistogram = registerHistogram(writerMetrics, 
"deleteFilesSizeHistogram");
   }
 
   public void updateFlushResult(WriteResult result) {
@@ -74,16 +74,21 @@ public class IcebergStreamWriterMetrics {
     // This should works equally well and we avoided the overhead of tracking 
the list of file sizes
     // in the {@link CommitSummary}, which currently stores simple stats for 
counters and gauges
     // metrics.
-    Arrays.stream(result.dataFiles())
-        .forEach(
-            dataFile -> {
-              dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
-            });
-    Arrays.stream(result.deleteFiles())
-        .forEach(
-            deleteFile -> {
-              
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
-            });
+    if (dataFilesSizeHistogram != null) {
+      Arrays.stream(result.dataFiles())
+          .forEach(
+              dataFile -> {
+                dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
+              });
+    }
+
+    if (deleteFilesSizeHistogram != null) {
+      Arrays.stream(result.deleteFiles())
+          .forEach(
+              deleteFile -> {
+                
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
+              });
+    }
   }
 
   public void flushDuration(long flushDurationMs) {
@@ -97,4 +102,60 @@ public class IcebergStreamWriterMetrics {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  private static Histogram registerHistogram(MetricGroup group, String name) {
+    Histogram histogram = newDropwizardHistogram();
+    return histogram != null ? group.histogram(name, histogram) : null;
+  }
+
+  private static Histogram newDropwizardHistogram() {
+    if (DROPWIZARD == null) {
+      return null;
+    }
+
+    Object reservoir = 
DROPWIZARD.reservoirCtor.newInstance(HISTOGRAM_RESERVOIR_SIZE);
+    Object codahaleHistogram = DROPWIZARD.histogramCtor.newInstance(reservoir);
+    return DROPWIZARD.wrapperCtor.newInstance(codahaleHistogram);
+  }
+
+  private static DropwizardCtors loadDropwizardCtors() {
+    try {
+      Class<?> reservoirInterface =
+          
DynClasses.builder().impl("com.codahale.metrics.Reservoir").buildChecked();
+      Class<?> codahaleHistogramClass =
+          
DynClasses.builder().impl("com.codahale.metrics.Histogram").buildChecked();
+      return new DropwizardCtors(
+          DynConstructors.builder()
+              .impl("com.codahale.metrics.SlidingWindowReservoir", int.class)
+              .buildChecked(),
+          DynConstructors.builder()
+              .impl("com.codahale.metrics.Histogram", reservoirInterface)
+              .buildChecked(),
+          DynConstructors.builder(Histogram.class)
+              .impl(
+                  
"org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper",
+                  codahaleHistogramClass)
+              .buildChecked());
+    } catch (ClassNotFoundException | NoSuchMethodException e) {
+      LOG.warn(
+          "Cannot load Dropwizard metrics; is 
org.apache.flink:flink-metrics-dropwizard on the classpath?",
+          e);
+      return null;
+    }
+  }
+
+  private record DropwizardCtors(
+      DynConstructors.Ctor<?> reservoirCtor,
+      DynConstructors.Ctor<?> histogramCtor,
+      DynConstructors.Ctor<Histogram> wrapperCtor) {}
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
new file mode 100644
index 0000000000..42bbfc0d36
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.iceberg.io.WriteResult;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergStreamWriterMetrics {
+
+  @Test
+  void histogramsCreatedWhenDropwizardAvailable() {
+    IcebergStreamWriterMetrics metrics =
+        new IcebergStreamWriterMetrics(
+            UnregisteredMetricsGroup.createSinkWriterMetricGroup(), 
"db.table");
+
+    assertThat(metrics.dataFilesSizeHistogram()).isNotNull();
+    assertThat(metrics.deleteFilesSizeHistogram()).isNotNull();
+
+    assertThatNoException()
+        .isThrownBy(() -> 
metrics.updateFlushResult(WriteResult.builder().build()));
+  }
+}
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index 626cc01b28..94f851e032 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -169,9 +169,6 @@ 
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
-    // To support dropwizard histogram metrics (not shipped by Flink by 
default)
-    implementation libs.flink20.metrics.dropwizard
-
     // for integration testing with the flink-runtime-jar
     // all of those dependencies are required because the integration test 
extends FlinkTestBase
     integrationCompileOnly project(':iceberg-api')
diff --git a/flink/v2.0/flink-runtime/LICENSE b/flink/v2.0/flink-runtime/LICENSE
index 11460c3307..364652a5ac 100644
--- a/flink/v2.0/flink-runtime/LICENSE
+++ b/flink/v2.0/flink-runtime/LICENSE
@@ -460,22 +460,6 @@ License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2
 
 
--------------------------------------------------------------------------------
 
-This product bundles Dropwizard Metrics.
-
-Copyright: (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
-Project URL: https://github.com/dropwizard/metrics
-License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product bundles Apache Flink's optional support for Dropwizard Metrics.
-
-Copyright: 2014-2026 The Apache Software Foundation
-Project URL: https://flink.apache.org/
-License: Apache License, Version 2.0 - 
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
 This product bundles RoaringBitmap.
 
 Copyright: (c) 2013-... the RoaringBitmap authors
diff --git a/flink/v2.0/flink-runtime/runtime-deps.txt 
b/flink/v2.0/flink-runtime/runtime-deps.txt
index c70e3fbba9..00c53ed388 100644
--- a/flink/v2.0/flink-runtime/runtime-deps.txt
+++ b/flink/v2.0/flink-runtime/runtime-deps.txt
@@ -6,11 +6,9 @@ com.github.luben:zstd-jni:1.5.7-3
 com.google.errorprone:error_prone_annotations:2.10.0
 dev.failsafe:failsafe:3.3.2
 io.airlift:aircompressor:2.0.3
-io.dropwizard.metrics:metrics-core:3.2.6
 org.apache.avro:avro:1.12.1
 org.apache.datasketches:datasketches-java:6.2.0
 org.apache.datasketches:datasketches-memory:3.0.2
-org.apache.flink:flink-metrics-dropwizard:2.0.0
 org.apache.httpcomponents.client5:httpclient5:5.6
 org.apache.httpcomponents.core5:httpcore5-h2:5.4
 org.apache.httpcomponents.core5:httpcore5:5.4
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
index 434f396957..6cf15ff713 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
@@ -18,23 +18,33 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import com.codahale.metrics.SlidingWindowReservoir;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.util.ScanTaskUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Internal
 public class IcebergStreamWriterMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergStreamWriterMetrics.class);
+
   // 1,024 reservoir size should cost about 8KB, which is quite small.
   // It should also produce good accuracy for histogram distribution (like 
percentiles).
   private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
 
+  // Histogram metrics loaded through Flink's optional 
flink-metrics-dropwizard dependency.
+  // Will be null if not available.
+  private static final DropwizardCtors DROPWIZARD = loadDropwizardCtors();
+
   private final Counter flushedDataFiles;
   private final Counter flushedDeleteFiles;
   private final Counter flushedReferencedDataFiles;
@@ -51,18 +61,8 @@ public class IcebergStreamWriterMetrics {
     this.lastFlushDurationMs = new AtomicLong();
     writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get);
 
-    com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram =
-        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
-    this.dataFilesSizeHistogram =
-        writerMetrics.histogram(
-            "dataFilesSizeHistogram",
-            new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram));
-    com.codahale.metrics.Histogram dropwizardDeleteFilesSizeHistogram =
-        new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
-    this.deleteFilesSizeHistogram =
-        writerMetrics.histogram(
-            "deleteFilesSizeHistogram",
-            new 
DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
+    this.dataFilesSizeHistogram = registerHistogram(writerMetrics, 
"dataFilesSizeHistogram");
+    this.deleteFilesSizeHistogram = registerHistogram(writerMetrics, 
"deleteFilesSizeHistogram");
   }
 
   public void updateFlushResult(WriteResult result) {
@@ -74,16 +74,21 @@ public class IcebergStreamWriterMetrics {
     // This should works equally well and we avoided the overhead of tracking 
the list of file sizes
     // in the {@link CommitSummary}, which currently stores simple stats for 
counters and gauges
     // metrics.
-    Arrays.stream(result.dataFiles())
-        .forEach(
-            dataFile -> {
-              dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
-            });
-    Arrays.stream(result.deleteFiles())
-        .forEach(
-            deleteFile -> {
-              
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
-            });
+    if (dataFilesSizeHistogram != null) {
+      Arrays.stream(result.dataFiles())
+          .forEach(
+              dataFile -> {
+                dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
+              });
+    }
+
+    if (deleteFilesSizeHistogram != null) {
+      Arrays.stream(result.deleteFiles())
+          .forEach(
+              deleteFile -> {
+                
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
+              });
+    }
   }
 
   public void flushDuration(long flushDurationMs) {
@@ -97,4 +102,60 @@ public class IcebergStreamWriterMetrics {
   public Counter getFlushedDeleteFiles() {
     return flushedDeleteFiles;
   }
+
+  @VisibleForTesting
+  Histogram dataFilesSizeHistogram() {
+    return dataFilesSizeHistogram;
+  }
+
+  @VisibleForTesting
+  Histogram deleteFilesSizeHistogram() {
+    return deleteFilesSizeHistogram;
+  }
+
+  private static Histogram registerHistogram(MetricGroup group, String name) {
+    Histogram histogram = newDropwizardHistogram();
+    return histogram != null ? group.histogram(name, histogram) : null;
+  }
+
+  private static Histogram newDropwizardHistogram() {
+    if (DROPWIZARD == null) {
+      return null;
+    }
+
+    Object reservoir = 
DROPWIZARD.reservoirCtor.newInstance(HISTOGRAM_RESERVOIR_SIZE);
+    Object codahaleHistogram = DROPWIZARD.histogramCtor.newInstance(reservoir);
+    return DROPWIZARD.wrapperCtor.newInstance(codahaleHistogram);
+  }
+
+  private static DropwizardCtors loadDropwizardCtors() {
+    try {
+      Class<?> reservoirInterface =
+          
DynClasses.builder().impl("com.codahale.metrics.Reservoir").buildChecked();
+      Class<?> codahaleHistogramClass =
+          
DynClasses.builder().impl("com.codahale.metrics.Histogram").buildChecked();
+      return new DropwizardCtors(
+          DynConstructors.builder()
+              .impl("com.codahale.metrics.SlidingWindowReservoir", int.class)
+              .buildChecked(),
+          DynConstructors.builder()
+              .impl("com.codahale.metrics.Histogram", reservoirInterface)
+              .buildChecked(),
+          DynConstructors.builder(Histogram.class)
+              .impl(
+                  
"org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper",
+                  codahaleHistogramClass)
+              .buildChecked());
+    } catch (ClassNotFoundException | NoSuchMethodException e) {
+      LOG.warn(
+          "Cannot load Dropwizard metrics; is 
org.apache.flink:flink-metrics-dropwizard on the classpath?",
+          e);
+      return null;
+    }
+  }
+
+  private record DropwizardCtors(
+      DynConstructors.Ctor<?> reservoirCtor,
+      DynConstructors.Ctor<?> histogramCtor,
+      DynConstructors.Ctor<Histogram> wrapperCtor) {}
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
new file mode 100644
index 0000000000..42bbfc0d36
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.iceberg.io.WriteResult;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergStreamWriterMetrics {
+
+  @Test
+  void histogramsCreatedWhenDropwizardAvailable() {
+    IcebergStreamWriterMetrics metrics =
+        new IcebergStreamWriterMetrics(
+            UnregisteredMetricsGroup.createSinkWriterMetricGroup(), 
"db.table");
+
+    assertThat(metrics.dataFilesSizeHistogram()).isNotNull();
+    assertThat(metrics.deleteFilesSizeHistogram()).isNotNull();
+
+    assertThatNoException()
+        .isThrownBy(() -> 
metrics.updateFlushResult(WriteResult.builder().build()));
+  }
+}

Reply via email to