This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd267b900d [INLONG-11199][Sort] Integrate Grafana Loki for connectors
(#11212)
fd267b900d is described below
commit fd267b900d988bda07a9425923e3ced36960d7be
Author: Haotian Ma <[email protected]>
AuthorDate: Sat Oct 12 10:28:52 2024 +0800
[INLONG-11199][Sort] Integrate Grafana Loki for connectors (#11212)
---
docker/docker-compose/docker-compose.yml | 56 +++++++++-
docker/docker-compose/log-system/loki.yaml | 58 ++++++++++
docker/docker-compose/log-system/otel-config.yaml | 39 +++++++
.../inlong/sort/base/util/OpenTelemetryLogger.java | 120 +++++++++++++++++----
4 files changed, 253 insertions(+), 20 deletions(-)
diff --git a/docker/docker-compose/docker-compose.yml
b/docker/docker-compose/docker-compose.yml
index 1e894b00b8..a3573d799c 100644
--- a/docker/docker-compose/docker-compose.yml
+++ b/docker/docker-compose/docker-compose.yml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-version: '2.4'
+version: '3.0'
services:
mysql:
@@ -129,6 +129,7 @@ services:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
+ - OTEL_EXPORTER_ENDPOINT=logcollector:4317
ports:
- "8081:8081"
command: jobmanager
@@ -142,4 +143,57 @@ services:
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
+ - OTEL_EXPORTER_ENDPOINT=logcollector:4317
command: taskmanager
+
+ # The following services are used to collect logs for InLong-sort, not
effective by default, you can enable them by add `--profile sort-report up` to
the `docker-compose` command
+ # opentelemetry collector
+ logcollector:
+ image: otel/opentelemetry-collector-contrib:0.110.0
+ container_name: logcollector
+ volumes:
+ - ./log-system/otel-config.yaml:/otel-config.yaml
+ command: [ "--config=/otel-config.yaml"]
+ profiles: [sort-report]
+ ports:
+ - "4317:4317"
+
+ # grafana loki
+ loki:
+ image: grafana/loki:3.0.0
+ ports:
+ - "3100:3100"
+ profiles: [sort-report]
+ volumes:
+ - ./log-system/loki.yaml:/etc/loki/local-config.yaml
+ command: -config.file=/etc/loki/local-config.yaml
+
+ # grafana
+ grafana:
+ environment:
+ - GF_PATHS_PROVISIONING=/etc/grafana/provisioning
+ - GF_AUTH_ANONYMOUS_ENABLED=true
+ - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
+ entrypoint:
+ - sh
+ - -euc
+ - |
+ mkdir -p /etc/grafana/provisioning/datasources
+ cat <<EOF > /etc/grafana/provisioning/datasources/ds.yaml
+ apiVersion: 1
+ datasources:
+ - name: Loki
+ type: loki
+ access: proxy
+ orgId: 1
+ url: http://loki:3100
+ basicAuth: false
+ isDefault: true
+ version: 1
+ editable: false
+ EOF
+ /run.sh
+ image: grafana/grafana:latest
+ ports:
+ - "3000:3000"
+ profiles: [sort-report]
\ No newline at end of file
diff --git a/docker/docker-compose/log-system/loki.yaml
b/docker/docker-compose/log-system/loki.yaml
new file mode 100644
index 0000000000..746f8baac5
--- /dev/null
+++ b/docker/docker-compose/log-system/loki.yaml
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+auth_enabled: false
+
+limits_config:
+ allow_structured_metadata: true
+ volume_enabled: true
+ otlp_config:
+ resource_attributes:
+ attributes_config:
+ - action: index_label
+ attributes:
+ - level
+server:
+ http_listen_port: 3100
+
+common:
+ ring:
+ instance_addr: 0.0.0.0
+ kvstore:
+ store: inmemory
+ replication_factor: 1
+ path_prefix: /tmp/loki
+
+schema_config:
+ configs:
+ - from: 2020-05-15
+ store: tsdb
+ object_store: filesystem
+ schema: v13
+ index:
+ prefix: index_
+ period: 24h
+
+storage_config:
+ tsdb_shipper:
+ active_index_directory: /tmp/loki/index
+ cache_location: /tmp/loki/index_cache
+ filesystem:
+ directory: /tmp/loki/chunks
+
+pattern_ingester:
+ enabled: true
diff --git a/docker/docker-compose/log-system/otel-config.yaml
b/docker/docker-compose/log-system/otel-config.yaml
new file mode 100644
index 0000000000..6942f11e0f
--- /dev/null
+++ b/docker/docker-compose/log-system/otel-config.yaml
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: logcollector:4317
+processors:
+ batch:
+
+exporters:
+ logging:
+ verbosity: detailed
+ otlphttp:
+ endpoint: http://loki:3100/otlp
+ tls:
+ insecure: true
+
+service:
+ pipelines:
+ logs:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [otlphttp, logging]
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
index 82c88cf5f7..54dfefcf11 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java
@@ -38,6 +38,9 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
+/**
+ * OpenTelemetryLogger to collect logs and send to OpenTelemetry
+ */
public class OpenTelemetryLogger {
private OpenTelemetrySdk SDK; // OpenTelemetry SDK
@@ -50,33 +53,98 @@ public class OpenTelemetryLogger {
private final Level logLevel; // Log4j Log Level
+ private final String localHostIp; // Local Host IP
+
private static final Logger LOG =
LoggerFactory.getLogger(OpenTelemetryLogger.class);
- public OpenTelemetryLogger() {
- // Default Service Name
- serviceName = "inlong-sort-connector";
- // Get OpenTelemetry Exporter Endpoint from Environment Variable
- if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
- endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
- } else {
- endpoint = "localhost:4317";
- }
- // Default Log4j Layout
- this.layout = PatternLayout.newBuilder()
- .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} -
%msg%n")
- .withCharset(StandardCharsets.UTF_8)
- .build();
- // Default Log4j Log Level
- this.logLevel = Level.INFO;
+ public OpenTelemetryLogger(Builder builder) {
+ this.serviceName = builder.serviceName;
+ this.endpoint = builder.endpoint;
+ this.layout = builder.layout;
+ this.logLevel = builder.logLevel;
+ this.localHostIp = builder.localHostIp;
}
- public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?>
layout, Level logLevel) {
+ public OpenTelemetryLogger(String serviceName, String endpoint, Layout<?>
layout, Level logLevel,
+ String localHostIp) {
this.serviceName = serviceName;
this.endpoint = endpoint;
this.layout = layout;
this.logLevel = logLevel;
+ this.localHostIp = localHostIp;
+ }
+
+ /**
+ * OpenTelemetryLogger Builder
+ */
+ public static final class Builder {
+
+ private String endpoint; // OpenTelemetry Exporter Endpoint
+
+ private String serviceName; // OpenTelemetry Service Name
+
+ private Layout<?> layout; // Log4j Layout
+
+ private Level logLevel; // Log4j Log Level
+
+ private String localHostIp;
+
+ public Builder() {
+ }
+
+ public Builder setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ return this;
+ }
+
+ public Builder setLayout(Layout<?> layout) {
+ this.layout = layout;
+ return this;
+ }
+
+ public Builder setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ public Builder setLogLevel(Level logLevel) {
+ this.logLevel = logLevel;
+ return this;
+ }
+
+ public Builder setLocalHostIp(String localHostIp) {
+ this.localHostIp = localHostIp;
+ return this;
+ }
+
+ public OpenTelemetryLogger build() {
+ if (this.serviceName == null) {
+ this.serviceName = "unnamed_service";
+ }
+ if (this.endpoint == null) {
+ if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) {
+ this.endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT");
+ } else {
+ this.endpoint = "localhost:4317";
+ }
+ }
+ if (this.layout == null) {
+ this.layout = PatternLayout.newBuilder()
+ .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level
%logger{36} - %msg%n")
+ .withCharset(StandardCharsets.UTF_8)
+ .build();
+ }
+ if (this.logLevel == null) {
+ this.logLevel = Level.INFO;
+ }
+ return new OpenTelemetryLogger(this);
+ }
+
}
+ /**
+ * Create OpenTelemetry SDK with OpenTelemetry Exporter
+ */
private void createOpenTelemetrySdk() {
// Create OpenTelemetry SDK
OpenTelemetrySdkBuilder sdkBuilder = OpenTelemetrySdk.builder();
@@ -84,7 +152,9 @@ public class OpenTelemetryLogger {
SdkLoggerProviderBuilder loggerProviderBuilder =
SdkLoggerProvider.builder();
// get Resource
Resource resource = Resource.getDefault().toBuilder()
+ .put(ResourceAttributes.SERVICE_NAMESPACE, "inlong_sort")
.put(ResourceAttributes.SERVICE_NAME, this.serviceName)
+ .put(ResourceAttributes.HOST_NAME, this.localHostIp)
.build();
// set Resource
loggerProviderBuilder.setResource(resource);
@@ -102,7 +172,10 @@ public class OpenTelemetryLogger {
SDK = sdkBuilder.build();
}
- public void addOpenTelemetryAppender() {
+ /**
+ * Add OpenTelemetryAppender to Log4j
+ */
+ private void addOpenTelemetryAppender() {
org.apache.logging.log4j.spi.LoggerContext context =
LogManager.getContext(false);
LoggerContext loggerContext = (LoggerContext) context;
Configuration config = loggerContext.getConfiguration();
@@ -122,7 +195,10 @@ public class OpenTelemetryLogger {
loggerContext.updateLoggers();
}
- public void removeOpenTelemetryAppender() {
+ /**
+ * Remove OpenTelemetryAppender from Log4j
+ */
+ private void removeOpenTelemetryAppender() {
org.apache.logging.log4j.spi.LoggerContext context =
LogManager.getContext(false);
LoggerContext loggerContext = (LoggerContext) context;
Configuration config = loggerContext.getConfiguration();
@@ -137,6 +213,9 @@ public class OpenTelemetryLogger {
loggerContext.updateLoggers();
}
+ /**
+ * Install OpenTelemetryLogger for the application
+ */
public void install() {
addOpenTelemetryAppender();
createOpenTelemetrySdk();
@@ -144,6 +223,9 @@ public class OpenTelemetryLogger {
LOG.info("OpenTelemetryLogger installed");
}
+ /**
+ * Uninstall OpenTelemetryLogger
+ */
public void uninstall() {
LOG.info("OpenTelemetryLogger uninstalled");
SDK.close();