This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e2a26968c8 [INLONG-11349][Sort] Integrate opentelemetry for 
sort-connectors-v1.15 (#11351)
e2a26968c8 is described below

commit e2a26968c84f2929c269718d3a4bbb9913a8db88
Author: Haotian Ma <[email protected]>
AuthorDate: Tue Jan 7 15:23:52 2025 +0800

    [INLONG-11349][Sort] Integrate opentelemetry for sort-connectors-v1.15 
(#11351)
---
 .../inlong/sort/configuration/Constants.java       |  3 ++
 .../main/java/org/apache/inlong/sort/Entrance.java |  2 ++
 .../inlong/sort/tests/Kafka2StarRocksTest.java     | 20 +++++++++++++
 .../inlong/sort/tests/Mysql2StarRocksTest.java     | 21 ++++++++++++-
 .../inlong/sort/tests/Pulsar2StarRocksTest.java    | 19 ++++++++++++
 .../sort/tests/utils/FlinkContainerTestEnv.java    |  1 +
 .../tests/utils/FlinkContainerTestEnvJRE8.java     |  1 +
 .../sort/tests/utils/OpenTelemetryContainer.java   | 30 +++++++++++++++++++
 .../src/test/resources/env/otel-config.yaml        | 35 ++++++++++++++++++++++
 .../src/test/resources/flinkSql/mysql_test.sql     |  2 +-
 .../org/apache/inlong/sort/base/Constants.java     |  6 ++++
 .../sort-connectors/iceberg/pom.xml                |  2 ++
 .../inlong/sort/iceberg/source/IcebergSource.java  | 11 +++++--
 .../sort/iceberg/source/IcebergTableSource.java    |  4 +++
 .../iceberg/source/reader/IcebergSourceReader.java | 27 ++++++++++++++++-
 .../sort-flink-v1.15/sort-connectors/kafka/pom.xml |  2 ++
 .../inlong/sort/kafka/source/KafkaSource.java      |  8 +++--
 .../sort/kafka/source/KafkaSourceBuilder.java      |  9 +++++-
 .../kafka/source/reader/KafkaSourceReader.java     | 30 ++++++++++++++++++-
 .../sort/kafka/table/KafkaDynamicSource.java       | 18 +++++++----
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 10 +++++--
 .../table/UpsertKafkaDynamicTableFactory.java      |  5 ++--
 .../sort-connectors/mysql-cdc/pom.xml              |  2 ++
 .../apache/inlong/sort/mysql/MySqlTableSource.java |  7 +++++
 .../inlong/sort/mysql/MysqlTableFactory.java       |  4 ++-
 .../inlong/sort/mysql/source/MySqlSource.java      |  8 +++--
 .../sort/mysql/source/MySqlSourceBuilder.java      |  8 ++++-
 .../mysql/source/reader/MySqlSourceReader.java     | 25 +++++++++++++++-
 .../sort-connectors/pulsar/pom.xml                 |  2 ++
 .../inlong/sort/pulsar/PulsarTableFactory.java     | 10 +++----
 .../inlong/sort/pulsar/source/PulsarSource.java    |  8 +++--
 .../sort/pulsar/source/PulsarSourceBuilder.java    |  9 +++++-
 .../pulsar/source/PulsarSourceReaderFactory.java   |  9 ++++--
 .../source/reader/PulsarOrderedSourceReader.java   |  6 ++--
 .../source/reader/PulsarSourceReaderBase.java      | 25 +++++++++++++++-
 .../source/reader/PulsarUnorderedSourceReader.java |  6 ++--
 .../sort/pulsar/table/PulsarTableSource.java       | 16 +++++++---
 37 files changed, 365 insertions(+), 46 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 878b841076..a58febedfb 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -375,6 +375,9 @@ public class Constants {
     public static final ConfigOption<String> SQL_SCRIPT_FILE = 
key("sql.script.file").noDefaultValue()
             .withDescription("The file which is sql script and contains multi 
statement");
 
+    public static final ConfigOption<Boolean> ENABLE_LOG_REPORT = 
key("enable.log.report").defaultValue(false)
+            .withDescription("Whether to enable openTelemetry log report or 
not");
+
     // ------------------------------------------------------------------------
     // File format and compression related
     // ------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index a9744f793c..1c31227329 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -77,6 +77,8 @@ public class Entrance {
                 config.getString(Constants.UPSERT_MATERIALIZE));
         
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
                 config.getString(Constants.NOT_NULL_ENFORCER));
+        
tableEnv.getConfig().getConfiguration().setBoolean(Constants.ENABLE_LOG_REPORT.key(),
+                config.getBoolean(Constants.ENABLE_LOG_REPORT));
         String sqlFile = config.getString(Constants.SQL_SCRIPT_FILE);
         Parser parser;
         if (StringUtils.isEmpty(sqlFile)) {
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
index 70e6b2413e..d6bcca2fa1 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
 import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
 import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
 import org.apache.inlong.sort.tests.utils.StarRocksContainer;
 import org.apache.inlong.sort.tests.utils.TestUtils;
@@ -34,6 +35,7 @@ import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
 
 import java.io.IOException;
 import java.net.URI;
@@ -111,6 +113,15 @@ public class Kafka2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                     .withNetworkAliases("mysql")
                     .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
 
+    @ClassRule
+    public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+            (OpenTelemetryContainer) new OpenTelemetryContainer()
+                    
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+                            "/otel-config.yaml")
+                    .withCommand("--config=/otel-config.yaml")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("logcollector");
+
     @Before
     public void setup() {
         waitUntilJobRunning(Duration.ofSeconds(30));
@@ -152,6 +163,10 @@ public class Kafka2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
         if (STAR_ROCKS != null) {
             STAR_ROCKS.stop();
         }
+
+        if (OPEN_TELEMETRY_CONTAINER != null) {
+            OPEN_TELEMETRY_CONTAINER.stop();
+        }
     }
 
     private void initializeKafkaTable(String topic) {
@@ -223,5 +238,10 @@ public class Kafka2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                 "test_output1",
                 3,
                 60000L);
+        // check log appender
+        String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+        if (!logs.contains("OpenTelemetryLogger installed")) {
+            throw new Exception("Failure to append logs to OpenTelemetry");
+        }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
index 6b7a5aa644..e2867d7f9d 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
 import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
 import org.apache.inlong.sort.tests.utils.StarRocksContainer;
 import org.apache.inlong.sort.tests.utils.TestUtils;
 
@@ -30,6 +31,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.MountableFile;
 
 import java.net.URISyntaxException;
 import java.nio.file.Path;
@@ -85,6 +87,15 @@ public class Mysql2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                     .withNetworkAliases("mysql")
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
+    @ClassRule
+    public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+            (OpenTelemetryContainer) new OpenTelemetryContainer()
+                    
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+                            "/otel-config.yaml")
+                    .withCommand("--config=/otel-config.yaml")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("logcollector");
+
     @Before
     public void setup() {
         waitUntilJobRunning(Duration.ofSeconds(30));
@@ -121,6 +132,9 @@ public class Mysql2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
         if (STAR_ROCKS != null) {
             STAR_ROCKS.stop();
         }
+        if (OPEN_TELEMETRY_CONTAINER != null) {
+            OPEN_TELEMETRY_CONTAINER.stop();
+        }
     }
 
     /**
@@ -161,6 +175,11 @@ public class Mysql2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                 expectResult,
                 "test_output1",
                 3,
-                60000L);
+                80000L);
+        // check log appender
+        String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+        if (!logs.contains("OpenTelemetryLogger installed")) {
+            throw new Exception("Failure to append logs to OpenTelemetry");
+        }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
index e5252d0b4a..9c40577778 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.tests;
 
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
 import org.apache.inlong.sort.tests.utils.StarRocksContainer;
 import org.apache.inlong.sort.tests.utils.TestUtils;
 
@@ -35,6 +36,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -91,6 +93,15 @@ public class Pulsar2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                     .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
+    @ClassRule
+    public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+            (OpenTelemetryContainer) new OpenTelemetryContainer()
+                    
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+                            "/otel-config.yaml")
+                    .withCommand("--config=/otel-config.yaml")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases("logcollector");
+
     @Before
     public void setup() {
         waitUntilJobRunning(Duration.ofSeconds(30));
@@ -119,6 +130,9 @@ public class Pulsar2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
         if (STAR_ROCKS != null) {
             STAR_ROCKS.stop();
         }
+        if (OPEN_TELEMETRY_CONTAINER != null) {
+            OPEN_TELEMETRY_CONTAINER.stop();
+        }
     }
 
     @Test
@@ -144,6 +158,11 @@ public class Pulsar2StarRocksTest extends 
FlinkContainerTestEnvJRE8 {
                 "1,Alice,Hello, Pulsar",
                 "2,Bob,Goodbye, Pulsar");
         proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 
60000L);
+        // check log appender
+        String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+        if (!logs.contains("OpenTelemetryLogger installed")) {
+            throw new Exception("Failure to append logs to OpenTelemetry");
+        }
     }
 
 }
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index de6166442e..47ccd67f33 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -130,6 +130,7 @@ public abstract class FlinkContainerTestEnv extends 
TestLogger {
         commands.add(copyToContainerTmpPath(jobManager, 
constructDistJar(jars)));
         commands.add("--sql.script.file");
         commands.add(containerSqlFile);
+        commands.add("--enable.log.report true");
 
         ExecResult execResult =
                 jobManager.execInContainer("bash", "-c", String.join(" ", 
commands));
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
index a59d9c9e98..83c14a113b 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
@@ -45,6 +45,7 @@ public abstract class FlinkContainerTestEnvJRE8 extends 
FlinkContainerTestEnv {
                         .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
                         .withExposedPorts(DEBUG_PORT)
                         .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withEnv("OTEL_EXPORTER_ENDPOINT", "logcollector:4317")
                         .dependsOn(jobManager)
                         .withLogConsumer(new Slf4jLogConsumer(TM_LOG));
 
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
new file mode 100644
index 0000000000..0d07e85a9e
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.testcontainers.containers.GenericContainer;
+
+public class OpenTelemetryContainer extends GenericContainer {
+
+    public static final String IMAGE = 
"otel/opentelemetry-collector-contrib:0.110.0";
+    public static final Integer PORT = 4317;
+    public OpenTelemetryContainer() {
+        super(IMAGE);
+        addExposedPort(PORT);
+    }
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
new file mode 100644
index 0000000000..ad48b9cb07
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+receivers:
+  otlp:
+    protocols:
+      grpc:
+        endpoint: logcollector:4317
+processors:
+  batch:
+
+exporters:
+  debug:
+    verbosity: detailed
+
+service:
+  pipelines:
+    logs:
+      receivers: [otlp]
+      processors: [batch]
+      exporters: [debug]
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
index 9f74d54ae7..6c4a3efb84 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
@@ -10,7 +10,7 @@ CREATE TABLE test_input1 (
     'password' = 'inlong',
     'database-name' = 'test',
     'table-name' = 'test_input1',
-    'scan.incremental.snapshot.enabled' = 'false',
+    'scan.incremental.snapshot.enabled' = 'true',
     'jdbc.properties.useSSL' = 'false',
     'jdbc.properties.allowPublicKeyRetrieval' = 'true'
     );
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7e02b0ca96..d2480be58f 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -469,4 +469,10 @@ public final class Constants {
                     .withDescription(
                             "pulsar client auth params");
 
+    public static final ConfigOption<Boolean> ENABLE_LOG_REPORT =
+            ConfigOptions.key("enable.log.report")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether enable openTelemetry log report 
or not.");
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
index 7559d70bce..f30e2050b3 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -151,6 +151,8 @@
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
                                 <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
                                     <include>org.apache.inlong:*</include>
                                     <include>com.google.protobuf:*</include>
                                     <include>org.apache.kafka:*</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
index adff22e7cd..7b1b334317 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.iceberg.source;
 
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
 import org.apache.inlong.sort.iceberg.source.reader.IcebergSourceReader;
@@ -94,6 +95,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     private final SplitAssignerFactory assignerFactory;
 
     private final MetricOption metricOption;
+    private final boolean enableLogReport;
 
     // Can't use SerializableTable as enumerator needs a regular table
     // that can discover table changes
@@ -105,13 +107,15 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
             ReaderFunction<T> readerFunction,
             SplitAssignerFactory assignerFactory,
             Table table,
-            MetricOption metricOption) {
+            MetricOption metricOption,
+            boolean enableLogReport) {
         this.tableLoader = tableLoader;
         this.scanContext = scanContext;
         this.readerFunction = readerFunction;
         this.assignerFactory = assignerFactory;
         this.table = table;
         this.metricOption = metricOption;
+        this.enableLogReport = enableLogReport;
     }
 
     String name() {
@@ -167,7 +171,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
         InlongIcebergSourceReaderMetrics<T> metrics =
                 new 
InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(), 
lazyTable().name());
         metrics.registerMetrics(metricOption);
-        return new IcebergSourceReader<>(metrics, readerFunction, 
readerContext);
+        return new IcebergSourceReader<>(metrics, readerFunction, 
readerContext, enableLogReport);
     }
 
     @Override
@@ -522,9 +526,10 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
             }
             resolveMetricOption();
             checkRequired();
+            boolean enableLogReport = 
flinkConfig.get(Constants.ENABLE_LOG_REPORT);
             // Since builder already load the table, pass it to the source to 
avoid double loading
             return new IcebergSource<T>(
-                    tableLoader, context, readerFunction, 
splitAssignerFactory, table, metricOption);
+                    tableLoader, context, readerFunction, 
splitAssignerFactory, table, metricOption, enableLogReport);
         }
 
         private void checkRequired() {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
index f55f63fadd..7f5298a7bf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.iceberg.source;
 
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.iceberg.IcebergReadableMetadata;
 import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
 
@@ -83,6 +84,7 @@ public class IcebergTableSource
     private final Map<String, String> properties;
     private final boolean isLimitPushDown;
     private final ReadableConfig readableConfig;
+    private final boolean enableLogReport;
 
     private IcebergTableSource(IcebergTableSource toCopy) {
         this.loader = toCopy.loader;
@@ -95,6 +97,7 @@ public class IcebergTableSource
         this.readableConfig = toCopy.readableConfig;
         this.producedDataType = toCopy.producedDataType;
         this.metadataKeys = toCopy.metadataKeys;
+        this.enableLogReport = toCopy.enableLogReport;
     }
 
     public IcebergTableSource(
@@ -124,6 +127,7 @@ public class IcebergTableSource
         this.readableConfig = readableConfig;
         this.producedDataType = schema.toPhysicalRowDataType();
         this.metadataKeys = new ArrayList<>();
+        this.enableLogReport = readableConfig.get(Constants.ENABLE_LOG_REPORT);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index a8ad9b5259..e97adc01d4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.iceberg.source.reader;
 
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
@@ -25,6 +27,7 @@ import 
org.apache.iceberg.flink.source.reader.RecordAndPosition;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.flink.source.split.SplitRequestEvent;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.logging.log4j.Level;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -40,20 +43,34 @@ public class IcebergSourceReader<T>
             SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, 
IcebergSourceSplit, IcebergSourceSplit> {
 
     private final InlongIcebergSourceReaderMetrics<T> metrics;
+    private OpenTelemetryLogger openTelemetryLogger;
+    private final boolean enableLogReport;
+
     public IcebergSourceReader(
             InlongIcebergSourceReaderMetrics<T> metrics,
             ReaderFunction<T> readerFunction,
-            SourceReaderContext context) {
+            SourceReaderContext context,
+            boolean enableLogReport) {
         super(
                 () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
context),
                 new IcebergSourceRecordEmitter<>(),
                 context.getConfiguration(),
                 context);
         this.metrics = metrics;
+        this.enableLogReport = enableLogReport;
+        if (this.enableLogReport) {
+            this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                    .setLogLevel(Level.ERROR)
+                    .setServiceName(this.getClass().getSimpleName())
+                    .setLocalHostIp(this.context.getLocalHostName()).build();
+        }
     }
 
     @Override
     public void start() {
+        if (this.enableLogReport) {
+            this.openTelemetryLogger.install();
+        }
         // We request a split only if we did not get splits during the 
checkpoint restore.
         // Otherwise, reader restarts will keep requesting more and more 
splits.
         if (getNumberOfCurrentlyAssignedSplits() == 0) {
@@ -61,6 +78,14 @@ public class IcebergSourceReader<T>
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (this.enableLogReport) {
+            openTelemetryLogger.uninstall();
+        }
+    }
+
     @Override
     protected void onSplitFinished(Map<String, IcebergSourceSplit> 
finishedSplitIds) {
         requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index 23c24f0f2e..38cf7d6cb4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -76,6 +76,8 @@
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
                                 <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
                                     <include>org.apache.inlong:*</include>
                                     <include>org.apache.kafka:*</include>
                                     
<include>org.apache.flink:flink-connector-kafka</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
index 5004ec34a3..7fe014c54c 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -101,6 +101,7 @@ public class KafkaSource<OUT>
     private final KafkaDeserializationSchema<RowData> metricSchema;
     // The configurations.
     private final Properties props;
+    private final boolean enableLogReport;
 
     KafkaSource(
             KafkaSubscriber subscriber,
@@ -109,7 +110,8 @@ public class KafkaSource<OUT>
             Boundedness boundedness,
             KafkaRecordDeserializationSchema<OUT> deserializationSchema,
             KafkaDeserializationSchema<RowData> metricSchema,
-            Properties props) {
+            Properties props,
+            boolean enableLogReport) {
         this.subscriber = subscriber;
         this.startingOffsetsInitializer = startingOffsetsInitializer;
         this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
@@ -117,6 +119,7 @@ public class KafkaSource<OUT>
         this.deserializationSchema = deserializationSchema;
         this.metricSchema = metricSchema;
         this.props = props;
+        this.enableLogReport = enableLogReport;
     }
 
     /**
@@ -175,7 +178,8 @@ public class KafkaSource<OUT>
                 toConfiguration(props),
                 readerContext,
                 kafkaSourceReaderMetrics,
-                metricSchema);
+                metricSchema,
+                enableLogReport);
     }
 
     @Internal
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
index 58bb651b24..7e0422e481 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -105,6 +105,7 @@ public class KafkaSourceBuilder<OUT> {
     private KafkaDeserializationSchema<RowData> metricSchema;
     // The configurations.
     protected Properties props;
+    private boolean enableLogReport;
 
     KafkaSourceBuilder() {
         this.subscriber = null;
@@ -407,6 +408,11 @@ public class KafkaSourceBuilder<OUT> {
         return this;
     }
 
+    public KafkaSourceBuilder<OUT> enableLogReport(boolean enableLogReport) {
+        this.enableLogReport = enableLogReport;
+        return this;
+    }
+
     /**
      * Build the {@link KafkaSource}.
      *
@@ -422,7 +428,8 @@ public class KafkaSourceBuilder<OUT> {
                 boundedness,
                 deserializationSchema,
                 metricSchema,
-                props);
+                props,
+                enableLogReport);
     }
 
     // ------------- private helpers --------------
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
index 4643887c49..9474269a8a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.kafka.source.reader;
 
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
 import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
 
 import org.apache.flink.annotation.Internal;
@@ -37,6 +38,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.logging.log4j.Level;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +68,8 @@ public class KafkaSourceReader<T>
     private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
     private final boolean commitOffsetsOnCheckpoint;
     private final KafkaDeserializationSchema<RowData> metricSchema;
+    private OpenTelemetryLogger openTelemetryLogger;
+    private final boolean enableLogReport;
 
     public KafkaSourceReader(
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], 
byte[]>>> elementsQueue,
@@ -74,7 +78,8 @@ public class KafkaSourceReader<T>
             Configuration config,
             SourceReaderContext context,
             KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
-            KafkaDeserializationSchema<RowData> metricSchema) {
+            KafkaDeserializationSchema<RowData> metricSchema,
+            boolean enableLogReport) {
         super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, 
context);
         this.offsetsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
@@ -82,11 +87,34 @@ public class KafkaSourceReader<T>
         this.commitOffsetsOnCheckpoint =
                 config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
         this.metricSchema = metricSchema;
+        this.enableLogReport = enableLogReport;
         if (!commitOffsetsOnCheckpoint) {
             LOG.warn(
                     "Offset commit on checkpoint is disabled. "
                             + "Consuming offset will not be reported back to 
Kafka cluster.");
         }
+        if (this.enableLogReport) {
+            this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                    .setLogLevel(Level.ERROR)
+                    .setServiceName(this.getClass().getSimpleName())
+                    .setLocalHostIp(this.context.getLocalHostName()).build();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (this.enableLogReport) {
+            this.openTelemetryLogger.install();
+        }
+        super.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (this.enableLogReport) {
+            openTelemetryLogger.uninstall();
+        }
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 9b0b0aff64..ef7e34778c 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -163,6 +163,8 @@ public class KafkaDynamicSource
 
     private final MetricOption metricOption;
 
+    private final boolean enableLogReport;
+
     public KafkaDynamicSource(
             DataType physicalDataType,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
@@ -178,7 +180,8 @@ public class KafkaDynamicSource
             long startupTimestampMillis,
             boolean upsertMode,
             String tableIdentifier,
-            MetricOption metricOption) {
+            MetricOption metricOption,
+            boolean enableLogReport) {
         // Format attributes
         this.physicalDataType =
                 Preconditions.checkNotNull(
@@ -213,6 +216,7 @@ public class KafkaDynamicSource
         this.upsertMode = upsertMode;
         this.tableIdentifier = tableIdentifier;
         this.metricOption = metricOption;
+        this.enableLogReport = enableLogReport;
     }
 
     @Override
@@ -328,7 +332,8 @@ public class KafkaDynamicSource
                         startupTimestampMillis,
                         upsertMode,
                         tableIdentifier,
-                        metricOption);
+                        metricOption,
+                        enableLogReport);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
@@ -366,7 +371,8 @@ public class KafkaDynamicSource
                 && Objects.equals(upsertMode, that.upsertMode)
                 && Objects.equals(tableIdentifier, that.tableIdentifier)
                 && Objects.equals(watermarkStrategy, that.watermarkStrategy)
-                && Objects.equals(metricOption, that.metricOption);
+                && Objects.equals(metricOption, that.metricOption)
+                && Objects.equals(enableLogReport, that.enableLogReport);
     }
 
     @Override
@@ -389,7 +395,8 @@ public class KafkaDynamicSource
                 upsertMode,
                 tableIdentifier,
                 watermarkStrategy,
-                metricOption);
+                metricOption,
+                enableLogReport);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -443,7 +450,8 @@ public class KafkaDynamicSource
         kafkaSourceBuilder
                 .setProperties(properties)
                 
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer))
-                .setMetricSchema(kafkaDeserializer);
+                .setMetricSchema(kafkaDeserializer)
+                .enableLogReport(enableLogReport);
         return kafkaSourceBuilder.build();
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 3a320b7f88..1070039174 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -200,6 +200,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
         String auditKeys = tableOptions.get(AUDIT_KEYS);
+        Boolean enableLogReport = 
context.getConfiguration().get(ENABLE_LOG_REPORT);
 
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
@@ -220,7 +221,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
                 context.getObjectIdentifier().asSummaryString(),
-                metricOption);
+                metricOption,
+                enableLogReport);
     }
 
     @Override
@@ -387,7 +389,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
             String tableIdentifier,
-            MetricOption metricOption) {
+            MetricOption metricOption,
+            boolean enableLogReport) {
         return new KafkaDynamicSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -403,7 +406,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 startupTimestampMillis,
                 false,
                 tableIdentifier,
-                metricOption);
+                metricOption,
+                enableLogReport);
     }
 
     protected KafkaDynamicSink createKafkaTableSink(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6a8f5e20c9..2173ee5718 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -133,7 +133,7 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
         String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
         String auditKeys = tableOptions.get(AUDIT_KEYS);
-
+        Boolean enableLogReport = 
context.getConfiguration().get(ENABLE_LOG_REPORT);
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withAuditAddress(auditHostAndPorts)
@@ -155,7 +155,8 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                 0,
                 true,
                 context.getObjectIdentifier().asSummaryString(),
-                metricOption);
+                metricOption,
+                enableLogReport);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
index 3e080b5b8b..687d8f4f0f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
@@ -95,6 +95,8 @@
                             
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <artifactSet>
                                 <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
                                     <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     
<include>io.debezium:debezium-embedded</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
index 55aec6ed1c..3273f3ee2d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
@@ -86,6 +86,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final Properties jdbcProperties;
     private final Duration heartbeatInterval;
     private final String chunkKeyColumn;
+    private final boolean enableLogReport;
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -121,6 +122,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             double distributionFactorLower,
             StartupOptions startupOptions,
             boolean scanNewlyAddedTableEnabled,
+            boolean enableLogReport,
             Properties jdbcProperties,
             Duration heartbeatInterval,
             @Nullable String chunkKeyColumn,
@@ -136,6 +138,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.serverTimeZone = serverTimeZone;
         this.dbzProperties = dbzProperties;
         this.enableParallelRead = enableParallelRead;
+        this.enableLogReport = enableLogReport;
         this.splitSize = splitSize;
         this.splitMetaGroupSize = splitMetaGroupSize;
         this.fetchSize = fetchSize;
@@ -206,6 +209,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .startupOptions(startupOptions)
                             .deserializer(deserializer)
                             
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                            .enableLogReport(enableLogReport)
                             .jdbcProperties(jdbcProperties)
                             .heartbeatInterval(heartbeatInterval)
                             .chunkKeyColumn(chunkKeyColumn)
@@ -285,6 +289,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         distributionFactorLower,
                         startupOptions,
                         scanNewlyAddedTableEnabled,
+                        enableLogReport,
                         jdbcProperties,
                         heartbeatInterval,
                         chunkKeyColumn,
@@ -311,6 +316,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && distributionFactorUpper == that.distributionFactorUpper
                 && distributionFactorLower == that.distributionFactorLower
                 && scanNewlyAddedTableEnabled == 
that.scanNewlyAddedTableEnabled
+                && enableLogReport == that.enableLogReport
                 && Objects.equals(physicalSchema, that.physicalSchema)
                 && Objects.equals(hostname, that.hostname)
                 && Objects.equals(database, that.database)
@@ -358,6 +364,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 producedDataType,
                 metadataKeys,
                 scanNewlyAddedTableEnabled,
+                enableLogReport,
                 jdbcProperties,
                 heartbeatInterval,
                 chunkKeyColumn,
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index f903780a36..0dc854dfa2 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.mysql;
 
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
@@ -69,6 +70,7 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
         final String username = config.get(USERNAME);
         final String password = config.get(PASSWORD);
         final String databaseName = config.get(DATABASE_NAME);
+        final Boolean enableLogReport = 
context.getConfiguration().get(Constants.ENABLE_LOG_REPORT);
         validateRegex(DATABASE_NAME.key(), databaseName);
         final String tableName = config.get(TABLE_NAME);
         validateRegex(TABLE_NAME.key(), tableName);
@@ -129,6 +131,7 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
                 distributionFactorLower,
                 startupOptions,
                 scanNewlyAddedTableEnabled,
+                enableLogReport,
                 getJdbcProperties(context.getCatalogTable().getOptions()),
                 heartbeatInterval,
                 chunkKeyColumn,
@@ -337,7 +340,6 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
                             + "\"-U\" represents UPDATE_BEFORE.\n"
                             + "\"+U\" represents UPDATE_AFTER.\n"
                             + "\"-D\" represents DELETE.");
-
     // 
----------------------------------------------------------------------------
     // experimental options, won't add them to documentation
     // 
----------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
index da1dcbadae..3d4af2caae 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
@@ -103,6 +103,7 @@ public class MySqlSource<T>
 
     private final MySqlSourceConfigFactory configFactory;
     private final DebeziumDeserializationSchema<T> deserializationSchema;
+    private final boolean enableLogReport;
 
     /**
      * Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}.
@@ -116,9 +117,11 @@ public class MySqlSource<T>
 
     MySqlSource(
             MySqlSourceConfigFactory configFactory,
-            DebeziumDeserializationSchema<T> deserializationSchema) {
+            DebeziumDeserializationSchema<T> deserializationSchema,
+            boolean enableLogReport) {
         this.configFactory = configFactory;
         this.deserializationSchema = deserializationSchema;
+        this.enableLogReport = enableLogReport;
     }
 
     public MySqlSourceConfigFactory getConfigFactory() {
@@ -168,7 +171,8 @@ public class MySqlSource<T>
                 readerContext.getConfiguration(),
                 mySqlSourceReaderContext,
                 sourceConfig,
-                deserializationSchema);
+                deserializationSchema,
+                enableLogReport);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
index 775da14035..3b4b7ff811 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
@@ -51,6 +51,7 @@ public class MySqlSourceBuilder<T> {
 
     private final MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
     private DebeziumDeserializationSchema<T> deserializer;
+    private boolean enableLogReport;
 
     public MySqlSourceBuilder<T> hostname(String hostname) {
         this.configFactory.hostname(hostname);
@@ -235,12 +236,17 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    public MySqlSourceBuilder<T> enableLogReport(boolean enableLogReport) {
+        this.enableLogReport = enableLogReport;
+        return this;
+    }
+
     /**
      * Build the MySqlSource
      *
      * @return a MySqlParallelSource with the settings made for this builder.
      */
     public MySqlSource<T> build() {
-        return new MySqlSource<>(configFactory, checkNotNull(deserializer));
+        return new MySqlSource<>(configFactory, checkNotNull(deserializer), 
enableLogReport);
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
index 01f34f28b1..6fbd7b96ba 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.mysql.source.reader;
 
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
 import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;
 
 import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
@@ -55,6 +56,7 @@ import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.logging.log4j.Level;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,8 @@ public class MySqlSourceReader<T>
     private final MySqlSourceReaderContext mySqlSourceReaderContext;
     private MySqlBinlogSplit suspendedBinlogSplit;
     private final DebeziumDeserializationSchema<T> metricSchema;
+    private OpenTelemetryLogger openTelemetryLogger;
+    private final boolean enableLogReport;
 
     public MySqlSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementQueue,
@@ -95,7 +99,8 @@ public class MySqlSourceReader<T>
             RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter,
             Configuration config,
             MySqlSourceReaderContext context,
-            MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T> 
metricSchema) {
+            MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T> 
metricSchema,
+            boolean enableLogReport) {
         super(
                 elementQueue,
                 new SingleThreadFetcherManager<>(elementQueue, 
splitReaderSupplier::get),
@@ -109,15 +114,33 @@ public class MySqlSourceReader<T>
         this.mySqlSourceReaderContext = context;
         this.suspendedBinlogSplit = null;
         this.metricSchema = metricSchema;
+        this.enableLogReport = enableLogReport;
+        if (enableLogReport) {
+            this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                    .setLogLevel(Level.ERROR)
+                    .setServiceName(this.getClass().getSimpleName())
+                    .setLocalHostIp(this.context.getLocalHostName()).build();
+        }
     }
 
     @Override
     public void start() {
+        if (enableLogReport) {
+            openTelemetryLogger.install();
+        }
         if (getNumberOfCurrentlyAssignedSplits() == 0) {
             context.sendSplitRequest();
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (enableLogReport) {
+            openTelemetryLogger.uninstall();
+        }
+    }
+
     @Override
     protected MySqlSplitState initializedState(MySqlSplit split) {
         if (split.isSnapshotSplit()) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 3e6a1b3ee2..97cc31232e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -242,6 +242,8 @@
                             
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                             <artifactSet>
                                 <includes>
+                                    <include>io.opentelemetry*</include>
+                                    <include>com.squareup.*</include>
                                     <include>org.apache.inlong:*</include>
                                     
<include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
                                     
<include>io.streamnative.connectors:flink-protobuf</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 780dfde8b5..7fa2a4cddc 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -48,9 +48,7 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
-import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.*;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat;
@@ -125,7 +123,7 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
         final StartCursor startCursor = getStartCursor(tableOptions);
         final StopCursor stopCursor = getStopCursor(tableOptions);
         final SubscriptionType subscriptionType = 
getSubscriptionType(tableOptions);
-
+        final boolean enableLogReport = 
context.getConfiguration().get(ENABLE_LOG_REPORT);
         // Forward source configs
         final Properties properties = getPulsarProperties(tableOptions);
         properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
@@ -163,7 +161,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
         final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForMetadataPushdown =
                 valueDecodingFormat;
         final ChangelogMode changelogMode = 
decodingFormatForMetadataPushdown.getChangelogMode();
-
         return new PulsarTableSource(
                 deserializationSchemaFactory,
                 decodingFormatForMetadataPushdown,
@@ -172,7 +169,8 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 properties,
                 startCursor,
                 stopCursor,
-                subscriptionType);
+                subscriptionType,
+                enableLogReport);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
index b7b608ec82..756d2cb1a8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
@@ -91,6 +91,8 @@ public final class PulsarSource<OUT>
     /** The pulsar deserialization schema used for deserializing message. */
     private final PulsarDeserializationSchema<OUT> deserializationSchema;
 
+    private final boolean enableLogReport;
+
     /**
      * The constructor for PulsarSource, it's package protected for forcing 
using {@link
      * PulsarSourceBuilder}.
@@ -102,7 +104,8 @@ public final class PulsarSource<OUT>
             StartCursor startCursor,
             StopCursor stopCursor,
             Boundedness boundedness,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            boolean enableLogReport) {
         this.sourceConfiguration = sourceConfiguration;
         this.subscriber = subscriber;
         this.rangeGenerator = rangeGenerator;
@@ -110,6 +113,7 @@ public final class PulsarSource<OUT>
         this.stopCursor = stopCursor;
         this.boundedness = boundedness;
         this.deserializationSchema = deserializationSchema;
+        this.enableLogReport = enableLogReport;
     }
 
     /**
@@ -136,7 +140,7 @@ public final class PulsarSource<OUT>
         deserializationSchema.open(initializationContext, sourceConfiguration);
 
         return PulsarSourceReaderFactory.create(
-                readerContext, deserializationSchema, sourceConfiguration);
+                readerContext, deserializationSchema, sourceConfiguration, 
enableLogReport);
     }
 
     @Internal
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
index d8c05ce152..f783aa3bcf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
@@ -127,6 +127,7 @@ public final class PulsarSourceBuilder<OUT> {
     private StopCursor stopCursor;
     private Boundedness boundedness;
     private PulsarDeserializationSchema<OUT> deserializationSchema;
+    private boolean enableLogReport;
 
     // private builder constructor.
     PulsarSourceBuilder() {
@@ -412,6 +413,11 @@ public final class PulsarSourceBuilder<OUT> {
         return this;
     }
 
+    public PulsarSourceBuilder<OUT> enableLogReport(boolean enableLogReport) {
+        this.enableLogReport = enableLogReport;
+        return this;
+    }
+
     /**
      * Build the {@link PulsarSource}.
      *
@@ -498,7 +504,8 @@ public final class PulsarSourceBuilder<OUT> {
                 startCursor,
                 stopCursor,
                 boundedness,
-                deserializationSchema);
+                deserializationSchema,
+                enableLogReport);
     }
 
     // ------------- private helpers --------------
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
index bbc42c149f..3bccfda1f3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
@@ -64,7 +64,8 @@ public final class PulsarSourceReaderFactory {
     public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(
             SourceReaderContext readerContext,
             PulsarDeserializationSchema<OUT> deserializationSchema,
-            SourceConfiguration sourceConfiguration) {
+            SourceConfiguration sourceConfiguration,
+            boolean enableLogReport) {
 
         PulsarClient pulsarClient = createClient(sourceConfiguration);
         PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
@@ -93,7 +94,8 @@ public final class PulsarSourceReaderFactory {
                     sourceConfiguration,
                     pulsarClient,
                     pulsarAdmin,
-                    deserializationSchema);
+                    deserializationSchema,
+                    enableLogReport);
         } else if (subscriptionType == SubscriptionType.Shared
                 || subscriptionType == SubscriptionType.Key_Shared) {
             TransactionCoordinatorClient coordinatorClient =
@@ -119,7 +121,8 @@ public final class PulsarSourceReaderFactory {
                     pulsarClient,
                     pulsarAdmin,
                     coordinatorClient,
-                    deserializationSchema);
+                    deserializationSchema,
+                    enableLogReport);
         } else {
             throw new UnsupportedOperationException(
                     "This subscription type is not " + subscriptionType + " 
supported currently.");
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
index 3c75793f93..75ac35c025 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
@@ -83,14 +83,16 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            boolean enableLogReport) {
         super(
                 elementsQueue,
                 new PulsarOrderedFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
                 context,
                 sourceConfiguration,
                 pulsarClient,
-                pulsarAdmin);
+                pulsarAdmin,
+                enableLogReport);
 
         this.cursorsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
index f7e6bafc1d..3d9bc6a517 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.pulsar.source.reader;
 
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
@@ -44,6 +46,8 @@ abstract class PulsarSourceReaderBase<OUT>
     protected final SourceConfiguration sourceConfiguration;
     protected final PulsarClient pulsarClient;
     protected final PulsarAdmin pulsarAdmin;
+    private OpenTelemetryLogger openTelemetryLogger;
+    protected final boolean enableLogReport;
 
     protected PulsarSourceReaderBase(
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
@@ -51,7 +55,8 @@ abstract class PulsarSourceReaderBase<OUT>
             SourceReaderContext context,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin) {
+            PulsarAdmin pulsarAdmin,
+            boolean enableLogReport) {
         super(
                 elementsQueue,
                 splitFetcherManager,
@@ -62,6 +67,13 @@ abstract class PulsarSourceReaderBase<OUT>
         this.sourceConfiguration = sourceConfiguration;
         this.pulsarClient = pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
+        this.enableLogReport = enableLogReport;
+        if (enableLogReport) {
+            this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+                    .setLogLevel(org.apache.logging.log4j.Level.ERROR)
+                    .setServiceName(this.getClass().getSimpleName())
+                    .setLocalHostIp(this.context.getLocalHostName()).build();
+        }
     }
 
     @Override
@@ -75,6 +87,14 @@ abstract class PulsarSourceReaderBase<OUT>
         return splitState.toPulsarPartitionSplit();
     }
 
+    @Override
+    public void start() {
+        if (enableLogReport) {
+            this.openTelemetryLogger.install();
+        }
+        super.start();
+    }
+
     @Override
     public void close() throws Exception {
         // Close the all the consumers first.
@@ -83,5 +103,8 @@ abstract class PulsarSourceReaderBase<OUT>
         // Close shared pulsar resources.
         pulsarClient.shutdown();
         pulsarAdmin.close();
+        if (enableLogReport) {
+            openTelemetryLogger.uninstall();
+        }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
index adf15de0b1..f30d7b3477 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
@@ -81,14 +81,16 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
             @Nullable TransactionCoordinatorClient coordinatorClient,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            boolean enableLogReport) {
         super(
                 elementsQueue,
                 new PulsarUnorderedFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
                 context,
                 sourceConfiguration,
                 pulsarClient,
-                pulsarAdmin);
+                pulsarAdmin,
+                enableLogReport);
         this.coordinatorClient = coordinatorClient;
         this.transactionsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.transactionsOfFinishedSplits = Collections.synchronizedList(new 
ArrayList<>());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index e5df548650..cd73a7bfce 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -86,6 +86,8 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
 
     private final SubscriptionType subscriptionType;
 
+    private final boolean enableLogReport;
+
     public PulsarTableSource(
             PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory,
             DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForReadingMetadata,
@@ -94,7 +96,8 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
             Properties properties,
             StartCursor startCursor,
             StopCursor stopCursor,
-            SubscriptionType subscriptionType) {
+            SubscriptionType subscriptionType,
+            boolean enableLogReport) {
         // Format attributes
         this.deserializationSchemaFactory = 
checkNotNull(deserializationSchemaFactory);
         this.decodingFormatForReadingMetadata = 
checkNotNull(decodingFormatForReadingMetadata);
@@ -105,6 +108,7 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.startCursor = checkNotNull(startCursor);
         this.stopCursor = checkNotNull(stopCursor);
         this.subscriptionType = checkNotNull(subscriptionType);
+        this.enableLogReport = enableLogReport;
     }
 
     @Override
@@ -127,6 +131,7 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
                         // only support exclusive since shared mode requires 
pulsar with transaction enabled
                         // and supporting transaction consumes more resources 
in pulsar broker
                         .setSubscriptionType(SubscriptionType.Exclusive)
+                        .enableLogReport(enableLogReport)
                         .build();
         return SourceProvider.of(source);
     }
@@ -194,7 +199,8 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
                 properties,
                 startCursor,
                 stopCursor,
-                subscriptionType);
+                subscriptionType,
+                enableLogReport);
     }
 
     @Override
@@ -215,7 +221,8 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(properties, that.properties)
                 && Objects.equals(startCursor, that.startCursor)
                 && Objects.equals(stopCursor, that.stopCursor)
-                && subscriptionType == that.subscriptionType;
+                && subscriptionType == that.subscriptionType
+                && Objects.equals(enableLogReport, that.enableLogReport);
     }
 
     @Override
@@ -228,6 +235,7 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
                 properties,
                 startCursor,
                 stopCursor,
-                subscriptionType);
+                subscriptionType,
+                enableLogReport);
     }
 }

Reply via email to