This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7341131baad [FLINK-39104][metrics] Add configurable gzip compression 
for OTel gRPC/HTTP exporters
7341131baad is described below

commit 7341131baad8af667341817636952a93b33d25be
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Thu Jun 5 16:35:46 2025 +0100

    [FLINK-39104][metrics] Add configurable gzip compression for OTel gRPC/HTTP 
exporters
---
 .../open_telemetry_reporter_configuration.html     |  6 ++++
 .../events/otel/OpenTelemetryEventReporter.java    |  3 ++
 .../metrics/otel/OpenTelemetryMetricReporter.java  |  3 ++
 .../metrics/otel/OpenTelemetryReporterOptions.java | 33 ++++++++++++++++++++++
 .../traces/otel/OpenTelemetryTraceReporter.java    |  3 ++
 .../AbstractOpenTelemetryReporterProtocolTest.java | 31 ++++++++++++++++++++
 6 files changed, 79 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index b3142a4ad14..1c1193f19bd 100644
--- 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>metrics.reporter.OpenTelemetry.exporter.compression</h5></td>
+            <td style="word-wrap: break-word;">"none"</td>
+            <td>String</td>
+            <td>Compression method for OTel Reporter only 'gzip' or 'none'. 
Default is 'none'.</td>
+        </tr>
         <tr>
             <td><h5>metrics.reporter.OpenTelemetry.exporter.endpoint</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
index 265f70384c4..c7bc33402b3 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
@@ -42,6 +42,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
 
@@ -75,6 +76,7 @@ public class OpenTelemetryEventReporter extends 
OpenTelemetryReporterBase implem
                 OtlpHttpLogRecordExporterBuilder httpBuilder = 
OtlpHttpLogRecordExporter.builder();
                 tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
httpBuilder::setCompression);
                 logRecordExporter = httpBuilder.build();
                 break;
             default:
@@ -86,6 +88,7 @@ public class OpenTelemetryEventReporter extends 
OpenTelemetryReporterBase implem
                 OtlpGrpcLogRecordExporterBuilder grpcBuilder = 
OtlpGrpcLogRecordExporter.builder();
                 tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
grpcBuilder::setCompression);
                 logRecordExporter = grpcBuilder.build();
                 break;
         }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index facdcf6f0dd..e0e03db71ce 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -58,6 +58,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
 
@@ -106,6 +107,7 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
                 OtlpHttpMetricExporterBuilder httpBuilder = 
OtlpHttpMetricExporter.builder();
                 tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
httpBuilder::setCompression);
                 exporter = httpBuilder.build();
                 break;
             default:
@@ -117,6 +119,7 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
                 OtlpGrpcMetricExporterBuilder grpcBuilder = 
OtlpGrpcMetricExporter.builder();
                 tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
grpcBuilder::setCompression);
                 exporter = grpcBuilder.build();
                 break;
         }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index 9cd7c69ab52..94362ec8ec7 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -43,6 +43,9 @@ public final class OpenTelemetryReporterOptions {
         HTTP
     }
 
+    public static final String COMPRESSION_NONE = "none";
+    public static final String COMPRESSION_GZIP = "gzip";
+
     private OpenTelemetryReporterOptions() {}
 
     public static final ConfigOption<Protocol> EXPORTER_PROTOCOL =
@@ -73,6 +76,20 @@ public final class OpenTelemetryReporterOptions {
                                             "Timeout for OpenTelemetry 
Reporters, as Duration string. Example: 10s for 10 seconds")
                                     .build());
 
+    public static final ConfigOption<String> EXPORTER_COMPRESSION =
+            ConfigOptions.key("exporter.compression")
+                    .stringType()
+                    .defaultValue(COMPRESSION_NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            String.format(
+                                                    "Compression method for 
OTel Reporter only '%s' or '%s'. Default is '%s'.",
+                                                    COMPRESSION_GZIP,
+                                                    COMPRESSION_NONE,
+                                                    COMPRESSION_NONE))
+                                    .build());
+
     public static final ConfigOption<String> SERVICE_NAME =
             ConfigOptions.key("service.name")
                     .stringType()
@@ -107,4 +124,20 @@ public final class OpenTelemetryReporterOptions {
                 metricConfig.containsKey(endpointConfKey), "Must set " + 
EXPORTER_ENDPOINT.key());
         builder.accept(metricConfig.getProperty(endpointConfKey));
     }
+
+    @Internal
+    public static void tryConfigureCompression(
+            MetricConfig metricConfig, Consumer<String> builder) {
+        final String compressionConfKey = EXPORTER_COMPRESSION.key();
+        if (metricConfig.containsKey(compressionConfKey)) {
+            String compression = metricConfig.getProperty(compressionConfKey);
+            checkArgument(
+                    COMPRESSION_NONE.equals(compression) || 
COMPRESSION_GZIP.equals(compression),
+                    "Unsupported compression method: '%s'. Supported values 
are '%s' and '%s'.",
+                    compression,
+                    COMPRESSION_NONE,
+                    COMPRESSION_GZIP);
+            builder.accept(compression);
+        }
+    }
 }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index 903093a88c4..14c453021d0 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -43,6 +43,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
 import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
 
@@ -72,6 +73,7 @@ public class OpenTelemetryTraceReporter extends 
OpenTelemetryReporterBase implem
                 OtlpHttpSpanExporterBuilder httpBuilder = 
OtlpHttpSpanExporter.builder();
                 tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
httpBuilder::setCompression);
                 spanExporter = httpBuilder.build();
                 break;
             default:
@@ -83,6 +85,7 @@ public class OpenTelemetryTraceReporter extends 
OpenTelemetryReporterBase implem
                 OtlpGrpcSpanExporterBuilder grpcBuilder = 
OtlpGrpcSpanExporter.builder();
                 tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+                tryConfigureCompression(metricConfig, 
grpcBuilder::setCompression);
                 spanExporter = grpcBuilder.build();
                 break;
         }
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
index b564560933c..7cee8760805 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 /** Base class for OpenTelemetry reporter protocol configuration tests. */
 @ExtendWith(TestLoggerExtension.class)
 public abstract class AbstractOpenTelemetryReporterProtocolTest<T> extends 
OpenTelemetryTestBase {
@@ -76,6 +78,35 @@ public abstract class 
AbstractOpenTelemetryReporterProtocolTest<T> extends OpenT
         assertReported();
     }
 
+    @Test
+    public void testGzipCompressionGrpc() throws Exception {
+        MetricConfig config = createConfig("grpc");
+        config.setProperty(
+                OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(),
+                OpenTelemetryReporterOptions.COMPRESSION_GZIP);
+        setupAndReport(config);
+        assertReported();
+    }
+
+    @Test
+    public void testGzipCompressionHttp() throws Exception {
+        MetricConfig config = createConfig("http");
+        config.setProperty(
+                OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(),
+                OpenTelemetryReporterOptions.COMPRESSION_GZIP);
+        setupAndReport(config);
+        assertReported();
+    }
+
+    @Test
+    public void testInvalidCompressionMethodThrows() {
+        MetricConfig config = createConfig("grpc");
+        
config.setProperty(OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(), 
"invalid");
+        assertThatThrownBy(() -> setupAndReport(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported compression method");
+    }
+
     protected MetricConfig createConfig(String protocol) {
         boolean isHttp = protocol.equalsIgnoreCase("HTTP");
         MetricConfig config = isHttp ? new MetricConfig() : 
createMetricConfig();

Reply via email to