This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c2e599a8ee [INLONG-11065][Sort] Add openTelemetryAppender for the sort
connector (#11066)
c2e599a8ee is described below
commit c2e599a8eed959f8a976b92cbdb0fda6a0df146e
Author: Haotian Ma <[email protected]>
AuthorDate: Tue Sep 24 10:37:49 2024 +0800
[INLONG-11065][Sort] Add openTelemetryAppender for the sort connector
(#11066)
* [INLONG-11065][Sort] Provides a method to add openTelemetryAppender for
the sort connector
* [INLONG-11065][Sort] Optimize the code format of createOpenTelemetrySdk
function
---
inlong-sort/sort-flink/base/pom.xml | 70 +++++++++-
.../inlong/sort/base/util/OpenTelemetryLogger.java | 153 +++++++++++++++++++++
pom.xml | 1 +
3 files changed, 223 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-flink/base/pom.xml
b/inlong-sort/sort-flink/base/pom.xml
index 12b10fe19d..ef1c4f1dc5 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -63,8 +63,76 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.instrumentation</groupId>
+ <artifactId>opentelemetry-instrumentation-api</artifactId>
+ <version>${otel.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.instrumentation</groupId>
+ <artifactId>opentelemetry-log4j-appender-2.17</artifactId>
+ <version>${otel.alpha.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-trace</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-otlp</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-semconv</artifactId>
+ <version>${otel.alpha.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-logs</artifactId>
+ <version>${otel.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>io.opentelemetry*</include>
+ <include>com.squareup.okhttp3</include>
+ <include>com.squareup.okio</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
<profiles>
<profile>
<id>v1.13</id>
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
new file mode 100644
index 0000000000..82c88cf5f7
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
+import
io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.logs.SdkLoggerProvider;
+import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
+import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+
+public class OpenTelemetryLogger {
+
+ private OpenTelemetrySdk SDK; // OpenTelemetry SDK
+
+ private final String endpoint; // OpenTelemetry Exporter Endpoint
+
+ private final String serviceName; // OpenTelemetry Service Name
+
+ private final Layout<?> layout; // Log4j Layout
+
+ private final Level logLevel; // Log4j Log Level
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OpenTelemetryLogger.class);
+
+ public OpenTelemetryLogger() {
+ // Default Service Name
+ serviceName = "inlong-sort-connector";
+ // Get OpenTelemetry Exporter Endpoint from Environment Variable
+ if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
+ endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
+ } else {
+ endpoint = "localhost:4317";
+ }
+ // Default Log4j Layout
+ this.layout = PatternLayout.newBuilder()
+ .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} -
%msg%n")
+ .withCharset(StandardCharsets.UTF_8)
+ .build();
+ // Default Log4j Log Level
+ this.logLevel = Level.INFO;
+ }
+
+ public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?>
layout, Level logLevel) {
+ this.serviceName = serviceName;
+ this.endpoint = endpoint;
+ this.layout = layout;
+ this.logLevel = logLevel;
+ }
+
+ private void createOpenTelemetrySdk() {
+ // Create OpenTelemetry SDK
+ OpenTelemetrySdkBuilder sdkBuilder = OpenTelemetrySdk.builder();
+ // Create Logger Provider Builder
+ SdkLoggerProviderBuilder loggerProviderBuilder =
SdkLoggerProvider.builder();
+ // get Resource
+ Resource resource = Resource.getDefault().toBuilder()
+ .put(ResourceAttributes.SERVICE_NAME, this.serviceName)
+ .build();
+ // set Resource
+ loggerProviderBuilder.setResource(resource);
+ // Create OpenTelemetry Exporter
+ OtlpGrpcLogRecordExporter exporter =
OtlpGrpcLogRecordExporter.builder()
+ .setEndpoint("http://" + this.endpoint)
+ .build();
+ // Create BatchLogRecordProcessor use OpenTelemetry Exporter
+ BatchLogRecordProcessor batchLogRecordProcessor =
BatchLogRecordProcessor.builder(exporter).build();
+ // Add BatchLogRecordProcessor to Logger Provider Builder
+ loggerProviderBuilder.addLogRecordProcessor(batchLogRecordProcessor);
+ // set Logger Provider
+ sdkBuilder.setLoggerProvider(loggerProviderBuilder.build());
+ // Build OpenTelemetry SDK
+ SDK = sdkBuilder.build();
+ }
+
+ public void addOpenTelemetryAppender() {
+ org.apache.logging.log4j.spi.LoggerContext context =
LogManager.getContext(false);
+ LoggerContext loggerContext = (LoggerContext) context;
+ Configuration config = loggerContext.getConfiguration();
+ // Create OpenTelemetryAppender
+ OpenTelemetryAppender otelAppender = OpenTelemetryAppender.builder()
+ .setName("OpenTelemetryAppender")
+ .setLayout(this.layout)
+ .build();
+ otelAppender.start();
+ // add OpenTelemetryAppender to configuration
+ config.addAppender(otelAppender);
+ // Get Root Logger Configuration
+ LoggerConfig loggerConfig =
config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME);
+ // Add OpenTelemetryAppender to Root Logger
+ loggerConfig.addAppender(otelAppender, this.logLevel, null);
+ // refresh logger context
+ loggerContext.updateLoggers();
+ }
+
+ public void removeOpenTelemetryAppender() {
+ org.apache.logging.log4j.spi.LoggerContext context =
LogManager.getContext(false);
+ LoggerContext loggerContext = (LoggerContext) context;
+ Configuration config = loggerContext.getConfiguration();
+ config.getAppenders().values().forEach(appender -> {
+ // Remove OpenTelemetryAppender
+ if (appender instanceof OpenTelemetryAppender) {
+ config.getRootLogger().removeAppender(appender.getName());
+ appender.stop();
+ }
+ });
+ // refresh logger context
+ loggerContext.updateLoggers();
+ }
+
+ public void install() {
+ addOpenTelemetryAppender();
+ createOpenTelemetrySdk();
+ OpenTelemetryAppender.install(SDK);
+ LOG.info("OpenTelemetryLogger installed");
+ }
+
+ public void uninstall() {
+ LOG.info("OpenTelemetryLogger uninstalled");
+ SDK.close();
+ removeOpenTelemetryAppender();
+
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index babebca3a7..a313eb73ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,7 @@
<jedis.version>2.9.0</jedis.version>
<poi.version>5.2.3</poi.version>
<otel.version>1.28.0</otel.version>
+ <otel.alpha.version>1.28.0-alpha</otel.alpha.version>
<tencentcloud-api.version>3.1.830</tencentcloud-api.version>
<woodstox-core.version>5.4.0</woodstox-core.version>
<libfb303.version>0.9.3</libfb303.version>