This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7341131baad [FLINK-39104][metrics] Add configurable gzip compression
for OTel gRPC/HTTP exporters
7341131baad is described below
commit 7341131baad8af667341817636952a93b33d25be
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Thu Jun 5 16:35:46 2025 +0100
[FLINK-39104][metrics] Add configurable gzip compression for OTel gRPC/HTTP
exporters
---
.../open_telemetry_reporter_configuration.html | 6 ++++
.../events/otel/OpenTelemetryEventReporter.java | 3 ++
.../metrics/otel/OpenTelemetryMetricReporter.java | 3 ++
.../metrics/otel/OpenTelemetryReporterOptions.java | 33 ++++++++++++++++++++++
.../traces/otel/OpenTelemetryTraceReporter.java | 3 ++
.../AbstractOpenTelemetryReporterProtocolTest.java | 31 ++++++++++++++++++++
6 files changed, 79 insertions(+)
diff --git
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index b3142a4ad14..1c1193f19bd 100644
---
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+
<td><h5>metrics.reporter.OpenTelemetry.exporter.compression</h5></td>
+ <td style="word-wrap: break-word;">"none"</td>
+ <td>String</td>
+ <td>Compression method for OTel Reporter only 'gzip' or 'none'.
Default is 'none'.</td>
+ </tr>
<tr>
<td><h5>metrics.reporter.OpenTelemetry.exporter.endpoint</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
index 265f70384c4..c7bc33402b3 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java
@@ -42,6 +42,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
@@ -75,6 +76,7 @@ public class OpenTelemetryEventReporter extends
OpenTelemetryReporterBase implem
OtlpHttpLogRecordExporterBuilder httpBuilder =
OtlpHttpLogRecordExporter.builder();
tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
httpBuilder::setCompression);
logRecordExporter = httpBuilder.build();
break;
default:
@@ -86,6 +88,7 @@ public class OpenTelemetryEventReporter extends
OpenTelemetryReporterBase implem
OtlpGrpcLogRecordExporterBuilder grpcBuilder =
OtlpGrpcLogRecordExporter.builder();
tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
grpcBuilder::setCompression);
logRecordExporter = grpcBuilder.build();
break;
}
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index facdcf6f0dd..e0e03db71ce 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -58,6 +58,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
@@ -106,6 +107,7 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
OtlpHttpMetricExporterBuilder httpBuilder =
OtlpHttpMetricExporter.builder();
tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
httpBuilder::setCompression);
exporter = httpBuilder.build();
break;
default:
@@ -117,6 +119,7 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
OtlpGrpcMetricExporterBuilder grpcBuilder =
OtlpGrpcMetricExporter.builder();
tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
grpcBuilder::setCompression);
exporter = grpcBuilder.build();
break;
}
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index 9cd7c69ab52..94362ec8ec7 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -43,6 +43,9 @@ public final class OpenTelemetryReporterOptions {
HTTP
}
+ public static final String COMPRESSION_NONE = "none";
+ public static final String COMPRESSION_GZIP = "gzip";
+
private OpenTelemetryReporterOptions() {}
public static final ConfigOption<Protocol> EXPORTER_PROTOCOL =
@@ -73,6 +76,20 @@ public final class OpenTelemetryReporterOptions {
"Timeout for OpenTelemetry
Reporters, as Duration string. Example: 10s for 10 seconds")
.build());
+ public static final ConfigOption<String> EXPORTER_COMPRESSION =
+ ConfigOptions.key("exporter.compression")
+ .stringType()
+ .defaultValue(COMPRESSION_NONE)
+ .withDescription(
+ Description.builder()
+ .text(
+ String.format(
+ "Compression method for
OTel Reporter only '%s' or '%s'. Default is '%s'.",
+ COMPRESSION_GZIP,
+ COMPRESSION_NONE,
+ COMPRESSION_NONE))
+ .build());
+
public static final ConfigOption<String> SERVICE_NAME =
ConfigOptions.key("service.name")
.stringType()
@@ -107,4 +124,20 @@ public final class OpenTelemetryReporterOptions {
metricConfig.containsKey(endpointConfKey), "Must set " +
EXPORTER_ENDPOINT.key());
builder.accept(metricConfig.getProperty(endpointConfKey));
}
+
+ @Internal
+ public static void tryConfigureCompression(
+ MetricConfig metricConfig, Consumer<String> builder) {
+ final String compressionConfKey = EXPORTER_COMPRESSION.key();
+ if (metricConfig.containsKey(compressionConfKey)) {
+ String compression = metricConfig.getProperty(compressionConfKey);
+ checkArgument(
+ COMPRESSION_NONE.equals(compression) ||
COMPRESSION_GZIP.equals(compression),
+ "Unsupported compression method: '%s'. Supported values
are '%s' and '%s'.",
+ compression,
+ COMPRESSION_NONE,
+ COMPRESSION_GZIP);
+ builder.accept(compression);
+ }
+ }
}
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index 903093a88c4..14c453021d0 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -43,6 +43,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureCompression;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureEndpoint;
import static
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.tryConfigureTimeout;
@@ -72,6 +73,7 @@ public class OpenTelemetryTraceReporter extends
OpenTelemetryReporterBase implem
OtlpHttpSpanExporterBuilder httpBuilder =
OtlpHttpSpanExporter.builder();
tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
httpBuilder::setCompression);
spanExporter = httpBuilder.build();
break;
default:
@@ -83,6 +85,7 @@ public class OpenTelemetryTraceReporter extends
OpenTelemetryReporterBase implem
OtlpGrpcSpanExporterBuilder grpcBuilder =
OtlpGrpcSpanExporter.builder();
tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
+ tryConfigureCompression(metricConfig,
grpcBuilder::setCompression);
spanExporter = grpcBuilder.build();
break;
}
diff --git
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
index b564560933c..7cee8760805 100644
---
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
+++
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/AbstractOpenTelemetryReporterProtocolTest.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
/** Base class for OpenTelemetry reporter protocol configuration tests. */
@ExtendWith(TestLoggerExtension.class)
public abstract class AbstractOpenTelemetryReporterProtocolTest<T> extends
OpenTelemetryTestBase {
@@ -76,6 +78,35 @@ public abstract class
AbstractOpenTelemetryReporterProtocolTest<T> extends OpenT
assertReported();
}
+ @Test
+ public void testGzipCompressionGrpc() throws Exception {
+ MetricConfig config = createConfig("grpc");
+ config.setProperty(
+ OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(),
+ OpenTelemetryReporterOptions.COMPRESSION_GZIP);
+ setupAndReport(config);
+ assertReported();
+ }
+
+ @Test
+ public void testGzipCompressionHttp() throws Exception {
+ MetricConfig config = createConfig("http");
+ config.setProperty(
+ OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(),
+ OpenTelemetryReporterOptions.COMPRESSION_GZIP);
+ setupAndReport(config);
+ assertReported();
+ }
+
+ @Test
+ public void testInvalidCompressionMethodThrows() {
+ MetricConfig config = createConfig("grpc");
+
config.setProperty(OpenTelemetryReporterOptions.EXPORTER_COMPRESSION.key(),
"invalid");
+ assertThatThrownBy(() -> setupAndReport(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported compression method");
+ }
+
protected MetricConfig createConfig(String protocol) {
boolean isHttp = protocol.equalsIgnoreCase("HTTP");
MetricConfig config = isHttp ? new MetricConfig() :
createMetricConfig();