This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d7cb799451 Flink: Backport removal of optional
flink-metrics-dropwizard dependency to v2.0 and v1.20 (#16230)
d7cb799451 is described below
commit d7cb7994510c08d2b55352195773763d93243d9c
Author: Kevin Liu <[email protected]>
AuthorDate: Wed May 6 12:32:43 2026 -0400
Flink: Backport removal of optional flink-metrics-dropwizard dependency to
v2.0 and v1.20 (#16230)
---
flink/v1.20/build.gradle | 3 -
flink/v1.20/flink-runtime/LICENSE | 16 ---
flink/v1.20/flink-runtime/runtime-deps.txt | 2 -
.../flink/sink/IcebergStreamWriterMetrics.java | 109 ++++++++++++++++-----
.../flink/sink/TestIcebergStreamWriterMetrics.java | 42 ++++++++
flink/v2.0/build.gradle | 3 -
flink/v2.0/flink-runtime/LICENSE | 16 ---
flink/v2.0/flink-runtime/runtime-deps.txt | 2 -
.../flink/sink/IcebergStreamWriterMetrics.java | 109 ++++++++++++++++-----
.../flink/sink/TestIcebergStreamWriterMetrics.java | 42 ++++++++
10 files changed, 254 insertions(+), 90 deletions(-)
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index 2bbad1891c..c7ca24817b 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -169,9 +169,6 @@
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
- // To support dropwizard histogram metrics (not shipped by Flink by
default)
- implementation libs.flink120.metrics.dropwizard
-
// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test
extends FlinkTestBase
integrationCompileOnly project(':iceberg-api')
diff --git a/flink/v1.20/flink-runtime/LICENSE
b/flink/v1.20/flink-runtime/LICENSE
index 11460c3307..364652a5ac 100644
--- a/flink/v1.20/flink-runtime/LICENSE
+++ b/flink/v1.20/flink-runtime/LICENSE
@@ -460,22 +460,6 @@ License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2
--------------------------------------------------------------------------------
-This product bundles Dropwizard Metrics.
-
-Copyright: (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
-Project URL: https://github.com/dropwizard/metrics
-License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product bundles Apache Flink's optional support for Dropwizard Metrics.
-
-Copyright: 2014-2026 The Apache Software Foundation
-Project URL: https://flink.apache.org/
-License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
This product bundles RoaringBitmap.
Copyright: (c) 2013-... the RoaringBitmap authors
diff --git a/flink/v1.20/flink-runtime/runtime-deps.txt
b/flink/v1.20/flink-runtime/runtime-deps.txt
index 7c7aed1e43..00c53ed388 100644
--- a/flink/v1.20/flink-runtime/runtime-deps.txt
+++ b/flink/v1.20/flink-runtime/runtime-deps.txt
@@ -6,11 +6,9 @@ com.github.luben:zstd-jni:1.5.7-3
com.google.errorprone:error_prone_annotations:2.10.0
dev.failsafe:failsafe:3.3.2
io.airlift:aircompressor:2.0.3
-io.dropwizard.metrics:metrics-core:3.2.6
org.apache.avro:avro:1.12.1
org.apache.datasketches:datasketches-java:6.2.0
org.apache.datasketches:datasketches-memory:3.0.2
-org.apache.flink:flink-metrics-dropwizard:1.20.1
org.apache.httpcomponents.client5:httpclient5:5.6
org.apache.httpcomponents.core5:httpcore5-h2:5.4
org.apache.httpcomponents.core5:httpcore5:5.4
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
index 434f396957..6cf15ff713 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
@@ -18,23 +18,33 @@
*/
package org.apache.iceberg.flink.sink;
-import com.codahale.metrics.SlidingWindowReservoir;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.ScanTaskUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Internal
public class IcebergStreamWriterMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergStreamWriterMetrics.class);
+
// 1,024 reservoir size should cost about 8KB, which is quite small.
// It should also produce good accuracy for histogram distribution (like
percentiles).
private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
+ // Histogram metrics loaded through Flink's optional
flink-metrics-dropwizard dependency.
+ // Will be null if not available.
+ private static final DropwizardCtors DROPWIZARD = loadDropwizardCtors();
+
private final Counter flushedDataFiles;
private final Counter flushedDeleteFiles;
private final Counter flushedReferencedDataFiles;
@@ -51,18 +61,8 @@ public class IcebergStreamWriterMetrics {
this.lastFlushDurationMs = new AtomicLong();
writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get);
- com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram =
- new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
- this.dataFilesSizeHistogram =
- writerMetrics.histogram(
- "dataFilesSizeHistogram",
- new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram));
- com.codahale.metrics.Histogram dropwizardDeleteFilesSizeHistogram =
- new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
- this.deleteFilesSizeHistogram =
- writerMetrics.histogram(
- "deleteFilesSizeHistogram",
- new
DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
+ this.dataFilesSizeHistogram = registerHistogram(writerMetrics,
"dataFilesSizeHistogram");
+ this.deleteFilesSizeHistogram = registerHistogram(writerMetrics,
"deleteFilesSizeHistogram");
}
public void updateFlushResult(WriteResult result) {
@@ -74,16 +74,21 @@ public class IcebergStreamWriterMetrics {
// This should works equally well and we avoided the overhead of tracking
the list of file sizes
// in the {@link CommitSummary}, which currently stores simple stats for
counters and gauges
// metrics.
- Arrays.stream(result.dataFiles())
- .forEach(
- dataFile -> {
- dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
- });
- Arrays.stream(result.deleteFiles())
- .forEach(
- deleteFile -> {
-
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
- });
+ if (dataFilesSizeHistogram != null) {
+ Arrays.stream(result.dataFiles())
+ .forEach(
+ dataFile -> {
+ dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
+ });
+ }
+
+ if (deleteFilesSizeHistogram != null) {
+ Arrays.stream(result.deleteFiles())
+ .forEach(
+ deleteFile -> {
+
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
+ });
+ }
}
public void flushDuration(long flushDurationMs) {
@@ -97,4 +102,60 @@ public class IcebergStreamWriterMetrics {
public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
+
+ @VisibleForTesting
+ Histogram dataFilesSizeHistogram() {
+ return dataFilesSizeHistogram;
+ }
+
+ @VisibleForTesting
+ Histogram deleteFilesSizeHistogram() {
+ return deleteFilesSizeHistogram;
+ }
+
+ private static Histogram registerHistogram(MetricGroup group, String name) {
+ Histogram histogram = newDropwizardHistogram();
+ return histogram != null ? group.histogram(name, histogram) : null;
+ }
+
+ private static Histogram newDropwizardHistogram() {
+ if (DROPWIZARD == null) {
+ return null;
+ }
+
+ Object reservoir =
DROPWIZARD.reservoirCtor.newInstance(HISTOGRAM_RESERVOIR_SIZE);
+ Object codahaleHistogram = DROPWIZARD.histogramCtor.newInstance(reservoir);
+ return DROPWIZARD.wrapperCtor.newInstance(codahaleHistogram);
+ }
+
+ private static DropwizardCtors loadDropwizardCtors() {
+ try {
+ Class<?> reservoirInterface =
+
DynClasses.builder().impl("com.codahale.metrics.Reservoir").buildChecked();
+ Class<?> codahaleHistogramClass =
+
DynClasses.builder().impl("com.codahale.metrics.Histogram").buildChecked();
+ return new DropwizardCtors(
+ DynConstructors.builder()
+ .impl("com.codahale.metrics.SlidingWindowReservoir", int.class)
+ .buildChecked(),
+ DynConstructors.builder()
+ .impl("com.codahale.metrics.Histogram", reservoirInterface)
+ .buildChecked(),
+ DynConstructors.builder(Histogram.class)
+ .impl(
+
"org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper",
+ codahaleHistogramClass)
+ .buildChecked());
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ LOG.warn(
+ "Cannot load Dropwizard metrics; is
org.apache.flink:flink-metrics-dropwizard on the classpath?",
+ e);
+ return null;
+ }
+ }
+
+ private record DropwizardCtors(
+ DynConstructors.Ctor<?> reservoirCtor,
+ DynConstructors.Ctor<?> histogramCtor,
+ DynConstructors.Ctor<Histogram> wrapperCtor) {}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
new file mode 100644
index 0000000000..42bbfc0d36
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.iceberg.io.WriteResult;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergStreamWriterMetrics {
+
+ @Test
+ void histogramsCreatedWhenDropwizardAvailable() {
+ IcebergStreamWriterMetrics metrics =
+ new IcebergStreamWriterMetrics(
+ UnregisteredMetricsGroup.createSinkWriterMetricGroup(),
"db.table");
+
+ assertThat(metrics.dataFilesSizeHistogram()).isNotNull();
+ assertThat(metrics.deleteFilesSizeHistogram()).isNotNull();
+
+ assertThatNoException()
+ .isThrownBy(() ->
metrics.updateFlushResult(WriteResult.builder().build()));
+ }
+}
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index 626cc01b28..94f851e032 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -169,9 +169,6 @@
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
- // To support dropwizard histogram metrics (not shipped by Flink by
default)
- implementation libs.flink20.metrics.dropwizard
-
// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test
extends FlinkTestBase
integrationCompileOnly project(':iceberg-api')
diff --git a/flink/v2.0/flink-runtime/LICENSE b/flink/v2.0/flink-runtime/LICENSE
index 11460c3307..364652a5ac 100644
--- a/flink/v2.0/flink-runtime/LICENSE
+++ b/flink/v2.0/flink-runtime/LICENSE
@@ -460,22 +460,6 @@ License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2
--------------------------------------------------------------------------------
-This product bundles Dropwizard Metrics.
-
-Copyright: (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
-Project URL: https://github.com/dropwizard/metrics
-License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product bundles Apache Flink's optional support for Dropwizard Metrics.
-
-Copyright: 2014-2026 The Apache Software Foundation
-Project URL: https://flink.apache.org/
-License: Apache License, Version 2.0 -
https://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
This product bundles RoaringBitmap.
Copyright: (c) 2013-... the RoaringBitmap authors
diff --git a/flink/v2.0/flink-runtime/runtime-deps.txt
b/flink/v2.0/flink-runtime/runtime-deps.txt
index c70e3fbba9..00c53ed388 100644
--- a/flink/v2.0/flink-runtime/runtime-deps.txt
+++ b/flink/v2.0/flink-runtime/runtime-deps.txt
@@ -6,11 +6,9 @@ com.github.luben:zstd-jni:1.5.7-3
com.google.errorprone:error_prone_annotations:2.10.0
dev.failsafe:failsafe:3.3.2
io.airlift:aircompressor:2.0.3
-io.dropwizard.metrics:metrics-core:3.2.6
org.apache.avro:avro:1.12.1
org.apache.datasketches:datasketches-java:6.2.0
org.apache.datasketches:datasketches-memory:3.0.2
-org.apache.flink:flink-metrics-dropwizard:2.0.0
org.apache.httpcomponents.client5:httpclient5:5.6
org.apache.httpcomponents.core5:httpcore5-h2:5.4
org.apache.httpcomponents.core5:httpcore5:5.4
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
index 434f396957..6cf15ff713 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java
@@ -18,23 +18,33 @@
*/
package org.apache.iceberg.flink.sink;
-import com.codahale.metrics.SlidingWindowReservoir;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.ScanTaskUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Internal
public class IcebergStreamWriterMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergStreamWriterMetrics.class);
+
// 1,024 reservoir size should cost about 8KB, which is quite small.
// It should also produce good accuracy for histogram distribution (like
percentiles).
private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
+ // Histogram metrics loaded through Flink's optional
flink-metrics-dropwizard dependency.
+ // Will be null if not available.
+ private static final DropwizardCtors DROPWIZARD = loadDropwizardCtors();
+
private final Counter flushedDataFiles;
private final Counter flushedDeleteFiles;
private final Counter flushedReferencedDataFiles;
@@ -51,18 +61,8 @@ public class IcebergStreamWriterMetrics {
this.lastFlushDurationMs = new AtomicLong();
writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get);
- com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram =
- new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
- this.dataFilesSizeHistogram =
- writerMetrics.histogram(
- "dataFilesSizeHistogram",
- new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram));
- com.codahale.metrics.Histogram dropwizardDeleteFilesSizeHistogram =
- new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE));
- this.deleteFilesSizeHistogram =
- writerMetrics.histogram(
- "deleteFilesSizeHistogram",
- new
DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
+ this.dataFilesSizeHistogram = registerHistogram(writerMetrics,
"dataFilesSizeHistogram");
+ this.deleteFilesSizeHistogram = registerHistogram(writerMetrics,
"deleteFilesSizeHistogram");
}
public void updateFlushResult(WriteResult result) {
@@ -74,16 +74,21 @@ public class IcebergStreamWriterMetrics {
// This should works equally well and we avoided the overhead of tracking
the list of file sizes
// in the {@link CommitSummary}, which currently stores simple stats for
counters and gauges
// metrics.
- Arrays.stream(result.dataFiles())
- .forEach(
- dataFile -> {
- dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
- });
- Arrays.stream(result.deleteFiles())
- .forEach(
- deleteFile -> {
-
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
- });
+ if (dataFilesSizeHistogram != null) {
+ Arrays.stream(result.dataFiles())
+ .forEach(
+ dataFile -> {
+ dataFilesSizeHistogram.update(dataFile.fileSizeInBytes());
+ });
+ }
+
+ if (deleteFilesSizeHistogram != null) {
+ Arrays.stream(result.deleteFiles())
+ .forEach(
+ deleteFile -> {
+
deleteFilesSizeHistogram.update(ScanTaskUtil.contentSizeInBytes(deleteFile));
+ });
+ }
}
public void flushDuration(long flushDurationMs) {
@@ -97,4 +102,60 @@ public class IcebergStreamWriterMetrics {
public Counter getFlushedDeleteFiles() {
return flushedDeleteFiles;
}
+
+ @VisibleForTesting
+ Histogram dataFilesSizeHistogram() {
+ return dataFilesSizeHistogram;
+ }
+
+ @VisibleForTesting
+ Histogram deleteFilesSizeHistogram() {
+ return deleteFilesSizeHistogram;
+ }
+
+ private static Histogram registerHistogram(MetricGroup group, String name) {
+ Histogram histogram = newDropwizardHistogram();
+ return histogram != null ? group.histogram(name, histogram) : null;
+ }
+
+ private static Histogram newDropwizardHistogram() {
+ if (DROPWIZARD == null) {
+ return null;
+ }
+
+ Object reservoir =
DROPWIZARD.reservoirCtor.newInstance(HISTOGRAM_RESERVOIR_SIZE);
+ Object codahaleHistogram = DROPWIZARD.histogramCtor.newInstance(reservoir);
+ return DROPWIZARD.wrapperCtor.newInstance(codahaleHistogram);
+ }
+
+ private static DropwizardCtors loadDropwizardCtors() {
+ try {
+ Class<?> reservoirInterface =
+
DynClasses.builder().impl("com.codahale.metrics.Reservoir").buildChecked();
+ Class<?> codahaleHistogramClass =
+
DynClasses.builder().impl("com.codahale.metrics.Histogram").buildChecked();
+ return new DropwizardCtors(
+ DynConstructors.builder()
+ .impl("com.codahale.metrics.SlidingWindowReservoir", int.class)
+ .buildChecked(),
+ DynConstructors.builder()
+ .impl("com.codahale.metrics.Histogram", reservoirInterface)
+ .buildChecked(),
+ DynConstructors.builder(Histogram.class)
+ .impl(
+
"org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper",
+ codahaleHistogramClass)
+ .buildChecked());
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ LOG.warn(
+ "Cannot load Dropwizard metrics; is
org.apache.flink:flink-metrics-dropwizard on the classpath?",
+ e);
+ return null;
+ }
+ }
+
+ private record DropwizardCtors(
+ DynConstructors.Ctor<?> reservoirCtor,
+ DynConstructors.Ctor<?> histogramCtor,
+ DynConstructors.Ctor<Histogram> wrapperCtor) {}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
new file mode 100644
index 0000000000..42bbfc0d36
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriterMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.iceberg.io.WriteResult;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergStreamWriterMetrics {
+
+ @Test
+ void histogramsCreatedWhenDropwizardAvailable() {
+ IcebergStreamWriterMetrics metrics =
+ new IcebergStreamWriterMetrics(
+ UnregisteredMetricsGroup.createSinkWriterMetricGroup(),
"db.table");
+
+ assertThat(metrics.dataFilesSizeHistogram()).isNotNull();
+ assertThat(metrics.deleteFilesSizeHistogram()).isNotNull();
+
+ assertThatNoException()
+ .isThrownBy(() ->
metrics.updateFlushResult(WriteResult.builder().build()));
+ }
+}