This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e599a8ee [INLONG-11065][Sort] Add openTelemetryAppender for the sort 
connector (#11066)
c2e599a8ee is described below

commit c2e599a8eed959f8a976b92cbdb0fda6a0df146e
Author: Haotian Ma <[email protected]>
AuthorDate: Tue Sep 24 10:37:49 2024 +0800

    [INLONG-11065][Sort] Add openTelemetryAppender for the sort connector 
(#11066)
    
    * [INLONG-11065][Sort] Provides a method to add openTelemetryAppender for 
the sort connector
    
    * [INLONG-11065][Sort] Optimize the code format of createOpenTelemetrySdk 
function
---
 inlong-sort/sort-flink/base/pom.xml                |  70 +++++++++-
 .../inlong/sort/base/util/OpenTelemetryLogger.java | 153 +++++++++++++++++++++
 pom.xml                                            |   1 +
 3 files changed, 223 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-flink/base/pom.xml 
b/inlong-sort/sort-flink/base/pom.xml
index 12b10fe19d..ef1c4f1dc5 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -63,8 +63,76 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
-    </dependencies>
 
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry.instrumentation</groupId>
+            <artifactId>opentelemetry-instrumentation-api</artifactId>
+            <version>${otel.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry.instrumentation</groupId>
+            <artifactId>opentelemetry-log4j-appender-2.17</artifactId>
+            <version>${otel.alpha.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-sdk-trace</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-exporter-otlp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-sdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-semconv</artifactId>
+            <version>${otel.alpha.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.opentelemetry</groupId>
+            <artifactId>opentelemetry-sdk-logs</artifactId>
+            <version>${otel.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.okhttp3</include>
+                                    <include>com.squareup.okio</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
     <profiles>
         <profile>
             <id>v1.13</id>
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
new file mode 100644
index 0000000000..82c88cf5f7
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
+import 
io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.logs.SdkLoggerProvider;
+import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
+import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+
+public class OpenTelemetryLogger {
+
+    private OpenTelemetrySdk SDK; // OpenTelemetry SDK
+
+    private final String endpoint; // OpenTelemetry Exporter Endpoint
+
+    private final String serviceName; // OpenTelemetry Service Name
+
+    private final Layout<?> layout; // Log4j Layout
+
+    private final Level logLevel; // Log4j Log Level
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OpenTelemetryLogger.class);
+
+    public OpenTelemetryLogger() {
+        // Default Service Name
+        serviceName = "inlong-sort-connector";
+        // Get OpenTelemetry Exporter Endpoint from Environment Variable
+        if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
+            endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
+        } else {
+            endpoint = "localhost:4317";
+        }
+        // Default Log4j Layout
+        this.layout = PatternLayout.newBuilder()
+                .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n")
+                .withCharset(StandardCharsets.UTF_8)
+                .build();
+        // Default Log4j Log Level
+        this.logLevel = Level.INFO;
+    }
+
+    public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?> 
layout, Level logLevel) {
+        this.serviceName = serviceName;
+        this.endpoint = endpoint;
+        this.layout = layout;
+        this.logLevel = logLevel;
+    }
+
+    private void createOpenTelemetrySdk() {
+        // Create OpenTelemetry SDK
+        OpenTelemetrySdkBuilder sdkBuilder = OpenTelemetrySdk.builder();
+        // Create Logger Provider Builder
+        SdkLoggerProviderBuilder loggerProviderBuilder = 
SdkLoggerProvider.builder();
+        // get Resource
+        Resource resource = Resource.getDefault().toBuilder()
+                .put(ResourceAttributes.SERVICE_NAME, this.serviceName)
+                .build();
+        // set Resource
+        loggerProviderBuilder.setResource(resource);
+        // Create OpenTelemetry Exporter
+        OtlpGrpcLogRecordExporter exporter = 
OtlpGrpcLogRecordExporter.builder()
+                .setEndpoint("http://"; + this.endpoint)
+                .build();
+        // Create BatchLogRecordProcessor use OpenTelemetry Exporter
+        BatchLogRecordProcessor batchLogRecordProcessor = 
BatchLogRecordProcessor.builder(exporter).build();
+        // Add BatchLogRecordProcessor to Logger Provider Builder
+        loggerProviderBuilder.addLogRecordProcessor(batchLogRecordProcessor);
+        // set Logger Provider
+        sdkBuilder.setLoggerProvider(loggerProviderBuilder.build());
+        // Build OpenTelemetry SDK
+        SDK = sdkBuilder.build();
+    }
+
+    public void addOpenTelemetryAppender() {
+        org.apache.logging.log4j.spi.LoggerContext context = 
LogManager.getContext(false);
+        LoggerContext loggerContext = (LoggerContext) context;
+        Configuration config = loggerContext.getConfiguration();
+        // Create OpenTelemetryAppender
+        OpenTelemetryAppender otelAppender = OpenTelemetryAppender.builder()
+                .setName("OpenTelemetryAppender")
+                .setLayout(this.layout)
+                .build();
+        otelAppender.start();
+        // add OpenTelemetryAppender to configuration
+        config.addAppender(otelAppender);
+        // Get Root Logger Configuration
+        LoggerConfig loggerConfig = 
config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME);
+        // Add OpenTelemetryAppender to Root Logger
+        loggerConfig.addAppender(otelAppender, this.logLevel, null);
+        // refresh logger context
+        loggerContext.updateLoggers();
+    }
+
+    public void removeOpenTelemetryAppender() {
+        org.apache.logging.log4j.spi.LoggerContext context = 
LogManager.getContext(false);
+        LoggerContext loggerContext = (LoggerContext) context;
+        Configuration config = loggerContext.getConfiguration();
+        config.getAppenders().values().forEach(appender -> {
+            // Remove OpenTelemetryAppender
+            if (appender instanceof OpenTelemetryAppender) {
+                config.getRootLogger().removeAppender(appender.getName());
+                appender.stop();
+            }
+        });
+        // refresh logger context
+        loggerContext.updateLoggers();
+    }
+
+    public void install() {
+        addOpenTelemetryAppender();
+        createOpenTelemetrySdk();
+        OpenTelemetryAppender.install(SDK);
+        LOG.info("OpenTelemetryLogger installed");
+    }
+
+    public void uninstall() {
+        LOG.info("OpenTelemetryLogger uninstalled");
+        SDK.close();
+        removeOpenTelemetryAppender();
+
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index babebca3a7..a313eb73ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,7 @@
         <jedis.version>2.9.0</jedis.version>
         <poi.version>5.2.3</poi.version>
         <otel.version>1.28.0</otel.version>
+        <otel.alpha.version>1.28.0-alpha</otel.alpha.version>
         <tencentcloud-api.version>3.1.830</tencentcloud-api.version>
         <woodstox-core.version>5.4.0</woodstox-core.version>
         <libfb303.version>0.9.3</libfb303.version>

Reply via email to