This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e2a26968c8 [INLONG-11349][Sort] Integrate opentelemetry for
sort-connectors-v1.15 (#11351)
e2a26968c8 is described below
commit e2a26968c84f2929c269718d3a4bbb9913a8db88
Author: Haotian Ma <[email protected]>
AuthorDate: Tue Jan 7 15:23:52 2025 +0800
[INLONG-11349][Sort] Integrate opentelemetry for sort-connectors-v1.15
(#11351)
---
.../inlong/sort/configuration/Constants.java | 3 ++
.../main/java/org/apache/inlong/sort/Entrance.java | 2 ++
.../inlong/sort/tests/Kafka2StarRocksTest.java | 20 +++++++++++++
.../inlong/sort/tests/Mysql2StarRocksTest.java | 21 ++++++++++++-
.../inlong/sort/tests/Pulsar2StarRocksTest.java | 19 ++++++++++++
.../sort/tests/utils/FlinkContainerTestEnv.java | 1 +
.../tests/utils/FlinkContainerTestEnvJRE8.java | 1 +
.../sort/tests/utils/OpenTelemetryContainer.java | 30 +++++++++++++++++++
.../src/test/resources/env/otel-config.yaml | 35 ++++++++++++++++++++++
.../src/test/resources/flinkSql/mysql_test.sql | 2 +-
.../org/apache/inlong/sort/base/Constants.java | 6 ++++
.../sort-connectors/iceberg/pom.xml | 2 ++
.../inlong/sort/iceberg/source/IcebergSource.java | 11 +++++--
.../sort/iceberg/source/IcebergTableSource.java | 4 +++
.../iceberg/source/reader/IcebergSourceReader.java | 27 ++++++++++++++++-
.../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 2 ++
.../inlong/sort/kafka/source/KafkaSource.java | 8 +++--
.../sort/kafka/source/KafkaSourceBuilder.java | 9 +++++-
.../kafka/source/reader/KafkaSourceReader.java | 30 ++++++++++++++++++-
.../sort/kafka/table/KafkaDynamicSource.java | 18 +++++++----
.../sort/kafka/table/KafkaDynamicTableFactory.java | 10 +++++--
.../table/UpsertKafkaDynamicTableFactory.java | 5 ++--
.../sort-connectors/mysql-cdc/pom.xml | 2 ++
.../apache/inlong/sort/mysql/MySqlTableSource.java | 7 +++++
.../inlong/sort/mysql/MysqlTableFactory.java | 4 ++-
.../inlong/sort/mysql/source/MySqlSource.java | 8 +++--
.../sort/mysql/source/MySqlSourceBuilder.java | 8 ++++-
.../mysql/source/reader/MySqlSourceReader.java | 25 +++++++++++++++-
.../sort-connectors/pulsar/pom.xml | 2 ++
.../inlong/sort/pulsar/PulsarTableFactory.java | 10 +++----
.../inlong/sort/pulsar/source/PulsarSource.java | 8 +++--
.../sort/pulsar/source/PulsarSourceBuilder.java | 9 +++++-
.../pulsar/source/PulsarSourceReaderFactory.java | 9 ++++--
.../source/reader/PulsarOrderedSourceReader.java | 6 ++--
.../source/reader/PulsarSourceReaderBase.java | 25 +++++++++++++++-
.../source/reader/PulsarUnorderedSourceReader.java | 6 ++--
.../sort/pulsar/table/PulsarTableSource.java | 16 +++++++---
37 files changed, 365 insertions(+), 46 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 878b841076..a58febedfb 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -375,6 +375,9 @@ public class Constants {
public static final ConfigOption<String> SQL_SCRIPT_FILE =
key("sql.script.file").noDefaultValue()
.withDescription("The file which is sql script and contains multi
statement");
+ public static final ConfigOption<Boolean> ENABLE_LOG_REPORT =
key("enable.log.report").defaultValue(false)
+ .withDescription("Whether to enable openTelemetry log report or
not");
+
// ------------------------------------------------------------------------
// File format and compression related
// ------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index a9744f793c..1c31227329 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -77,6 +77,8 @@ public class Entrance {
config.getString(Constants.UPSERT_MATERIALIZE));
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
config.getString(Constants.NOT_NULL_ENFORCER));
+
tableEnv.getConfig().getConfiguration().setBoolean(Constants.ENABLE_LOG_REPORT.key(),
+ config.getBoolean(Constants.ENABLE_LOG_REPORT));
String sqlFile = config.getString(Constants.SQL_SCRIPT_FILE);
Parser parser;
if (StringUtils.isEmpty(sqlFile)) {
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
index 70e6b2413e..d6bcca2fa1 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.PlaceholderResolver;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
@@ -34,6 +35,7 @@ import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
import java.io.IOException;
import java.net.URI;
@@ -111,6 +113,15 @@ public class Kafka2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG));
+ @ClassRule
+ public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+ (OpenTelemetryContainer) new OpenTelemetryContainer()
+
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+ "/otel-config.yaml")
+ .withCommand("--config=/otel-config.yaml")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("logcollector");
+
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
@@ -152,6 +163,10 @@ public class Kafka2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
+
+ if (OPEN_TELEMETRY_CONTAINER != null) {
+ OPEN_TELEMETRY_CONTAINER.stop();
+ }
}
private void initializeKafkaTable(String topic) {
@@ -223,5 +238,10 @@ public class Kafka2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
"test_output1",
3,
60000L);
+ // check log appender
+ String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+ if (!logs.contains("OpenTelemetryLogger installed")) {
+ throw new Exception("Failure to append logs to OpenTelemetry");
+ }
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
index 6b7a5aa644..e2867d7f9d 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
@@ -30,6 +31,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.MountableFile;
import java.net.URISyntaxException;
import java.nio.file.Path;
@@ -85,6 +87,15 @@ public class Mysql2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(LOG));
+ @ClassRule
+ public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+ (OpenTelemetryContainer) new OpenTelemetryContainer()
+
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+ "/otel-config.yaml")
+ .withCommand("--config=/otel-config.yaml")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("logcollector");
+
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
@@ -121,6 +132,9 @@ public class Mysql2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
+ if (OPEN_TELEMETRY_CONTAINER != null) {
+ OPEN_TELEMETRY_CONTAINER.stop();
+ }
}
/**
@@ -161,6 +175,11 @@ public class Mysql2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
expectResult,
"test_output1",
3,
- 60000L);
+ 80000L);
+ // check log appender
+ String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+ if (!logs.contains("OpenTelemetryLogger installed")) {
+ throw new Exception("Failure to append logs to OpenTelemetry");
+ }
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
index e5252d0b4a..9c40577778 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
+import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;
@@ -35,6 +36,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
import java.net.URI;
import java.net.URISyntaxException;
@@ -91,6 +93,15 @@ public class Pulsar2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
+ @ClassRule
+ public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER =
+ (OpenTelemetryContainer) new OpenTelemetryContainer()
+
.withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"),
+ "/otel-config.yaml")
+ .withCommand("--config=/otel-config.yaml")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("logcollector");
+
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
@@ -119,6 +130,9 @@ public class Pulsar2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
}
+ if (OPEN_TELEMETRY_CONTAINER != null) {
+ OPEN_TELEMETRY_CONTAINER.stop();
+ }
}
@Test
@@ -144,6 +158,11 @@ public class Pulsar2StarRocksTest extends
FlinkContainerTestEnvJRE8 {
"1,Alice,Hello, Pulsar",
"2,Bob,Goodbye, Pulsar");
proxy.checkResultWithTimeout(expectedResult, "test_output1", 3,
60000L);
+ // check log appender
+ String logs = OPEN_TELEMETRY_CONTAINER.getLogs();
+ if (!logs.contains("OpenTelemetryLogger installed")) {
+ throw new Exception("Failure to append logs to OpenTelemetry");
+ }
}
}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index de6166442e..47ccd67f33 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -130,6 +130,7 @@ public abstract class FlinkContainerTestEnv extends
TestLogger {
commands.add(copyToContainerTmpPath(jobManager,
constructDistJar(jars)));
commands.add("--sql.script.file");
commands.add(containerSqlFile);
+ commands.add("--enable.log.report true");
ExecResult execResult =
jobManager.execInContainer("bash", "-c", String.join(" ",
commands));
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
index a59d9c9e98..83c14a113b 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java
@@ -45,6 +45,7 @@ public abstract class FlinkContainerTestEnvJRE8 extends
FlinkContainerTestEnv {
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withExposedPorts(DEBUG_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withEnv("OTEL_EXPORTER_ENDPOINT", "logcollector:4317")
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(TM_LOG));
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
new file mode 100644
index 0000000000..0d07e85a9e
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import org.testcontainers.containers.GenericContainer;
+
+public class OpenTelemetryContainer extends GenericContainer {
+
+ public static final String IMAGE =
"otel/opentelemetry-collector-contrib:0.110.0";
+ public static final Integer PORT = 4317;
+ public OpenTelemetryContainer() {
+ super(IMAGE);
+ addExposedPort(PORT);
+ }
+}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
new file mode 100644
index 0000000000..ad48b9cb07
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: logcollector:4317
+processors:
+ batch:
+
+exporters:
+ debug:
+ verbosity: detailed
+
+service:
+ pipelines:
+ logs:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [debug]
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
index 9f74d54ae7..6c4a3efb84 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql
@@ -10,7 +10,7 @@ CREATE TABLE test_input1 (
'password' = 'inlong',
'database-name' = 'test',
'table-name' = 'test_input1',
- 'scan.incremental.snapshot.enabled' = 'false',
+ 'scan.incremental.snapshot.enabled' = 'true',
'jdbc.properties.useSSL' = 'false',
'jdbc.properties.allowPublicKeyRetrieval' = 'true'
);
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7e02b0ca96..d2480be58f 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -469,4 +469,10 @@ public final class Constants {
.withDescription(
"pulsar client auth params");
+ public static final ConfigOption<Boolean> ENABLE_LOG_REPORT =
+ ConfigOptions.key("enable.log.report")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether enable openTelemetry log report
or not.");
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
index 7559d70bce..f30e2050b3 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -151,6 +151,8 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
+ <include>io.opentelemetry*</include>
+ <include>com.squareup.*</include>
<include>org.apache.inlong:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.kafka:*</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
index adff22e7cd..7b1b334317 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.iceberg.source;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
import org.apache.inlong.sort.iceberg.source.reader.IcebergSourceReader;
@@ -94,6 +95,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
private final SplitAssignerFactory assignerFactory;
private final MetricOption metricOption;
+ private final boolean enableLogReport;
// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
@@ -105,13 +107,15 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
Table table,
- MetricOption metricOption) {
+ MetricOption metricOption,
+ boolean enableLogReport) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.table = table;
this.metricOption = metricOption;
+ this.enableLogReport = enableLogReport;
}
String name() {
@@ -167,7 +171,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
InlongIcebergSourceReaderMetrics<T> metrics =
new
InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(),
lazyTable().name());
metrics.registerMetrics(metricOption);
- return new IcebergSourceReader<>(metrics, readerFunction,
readerContext);
+ return new IcebergSourceReader<>(metrics, readerFunction,
readerContext, enableLogReport);
}
@Override
@@ -522,9 +526,10 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
resolveMetricOption();
checkRequired();
+ boolean enableLogReport =
flinkConfig.get(Constants.ENABLE_LOG_REPORT);
// Since builder already load the table, pass it to the source to
avoid double loading
return new IcebergSource<T>(
- tableLoader, context, readerFunction,
splitAssignerFactory, table, metricOption);
+ tableLoader, context, readerFunction,
splitAssignerFactory, table, metricOption, enableLogReport);
}
private void checkRequired() {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
index f55f63fadd..7f5298a7bf 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.iceberg.source;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.iceberg.IcebergReadableMetadata;
import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
@@ -83,6 +84,7 @@ public class IcebergTableSource
private final Map<String, String> properties;
private final boolean isLimitPushDown;
private final ReadableConfig readableConfig;
+ private final boolean enableLogReport;
private IcebergTableSource(IcebergTableSource toCopy) {
this.loader = toCopy.loader;
@@ -95,6 +97,7 @@ public class IcebergTableSource
this.readableConfig = toCopy.readableConfig;
this.producedDataType = toCopy.producedDataType;
this.metadataKeys = toCopy.metadataKeys;
+ this.enableLogReport = toCopy.enableLogReport;
}
public IcebergTableSource(
@@ -124,6 +127,7 @@ public class IcebergTableSource
this.readableConfig = readableConfig;
this.producedDataType = schema.toPhysicalRowDataType();
this.metadataKeys = new ArrayList<>();
+ this.enableLogReport = readableConfig.get(Constants.ENABLE_LOG_REPORT);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index a8ad9b5259..e97adc01d4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.iceberg.source.reader;
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
@@ -25,6 +27,7 @@ import
org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.logging.log4j.Level;
import java.util.Collection;
import java.util.Collections;
@@ -40,20 +43,34 @@ public class IcebergSourceReader<T>
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T,
IcebergSourceSplit, IcebergSourceSplit> {
private final InlongIcebergSourceReaderMetrics<T> metrics;
+ private OpenTelemetryLogger openTelemetryLogger;
+ private final boolean enableLogReport;
+
public IcebergSourceReader(
InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
- SourceReaderContext context) {
+ SourceReaderContext context,
+ boolean enableLogReport) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction,
context),
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
this.metrics = metrics;
+ this.enableLogReport = enableLogReport;
+ if (this.enableLogReport) {
+ this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+ .setLogLevel(Level.ERROR)
+ .setServiceName(this.getClass().getSimpleName())
+ .setLocalHostIp(this.context.getLocalHostName()).build();
+ }
}
@Override
public void start() {
+ if (this.enableLogReport) {
+ this.openTelemetryLogger.install();
+ }
// We request a split only if we did not get splits during the
checkpoint restore.
// Otherwise, reader restarts will keep requesting more and more
splits.
if (getNumberOfCurrentlyAssignedSplits() == 0) {
@@ -61,6 +78,14 @@ public class IcebergSourceReader<T>
}
}
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (this.enableLogReport) {
+ openTelemetryLogger.uninstall();
+ }
+ }
+
@Override
protected void onSplitFinished(Map<String, IcebergSourceSplit>
finishedSplitIds) {
requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index 23c24f0f2e..38cf7d6cb4 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -76,6 +76,8 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
+ <include>io.opentelemetry*</include>
+ <include>com.squareup.*</include>
<include>org.apache.inlong:*</include>
<include>org.apache.kafka:*</include>
<include>org.apache.flink:flink-connector-kafka</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
index 5004ec34a3..7fe014c54c 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -101,6 +101,7 @@ public class KafkaSource<OUT>
private final KafkaDeserializationSchema<RowData> metricSchema;
// The configurations.
private final Properties props;
+ private final boolean enableLogReport;
KafkaSource(
KafkaSubscriber subscriber,
@@ -109,7 +110,8 @@ public class KafkaSource<OUT>
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
KafkaDeserializationSchema<RowData> metricSchema,
- Properties props) {
+ Properties props,
+ boolean enableLogReport) {
this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
@@ -117,6 +119,7 @@ public class KafkaSource<OUT>
this.deserializationSchema = deserializationSchema;
this.metricSchema = metricSchema;
this.props = props;
+ this.enableLogReport = enableLogReport;
}
/**
@@ -175,7 +178,8 @@ public class KafkaSource<OUT>
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics,
- metricSchema);
+ metricSchema,
+ enableLogReport);
}
@Internal
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
index 58bb651b24..7e0422e481 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -105,6 +105,7 @@ public class KafkaSourceBuilder<OUT> {
private KafkaDeserializationSchema<RowData> metricSchema;
// The configurations.
protected Properties props;
+ private boolean enableLogReport;
KafkaSourceBuilder() {
this.subscriber = null;
@@ -407,6 +408,11 @@ public class KafkaSourceBuilder<OUT> {
return this;
}
+ public KafkaSourceBuilder<OUT> enableLogReport(boolean enableLogReport) {
+ this.enableLogReport = enableLogReport;
+ return this;
+ }
+
/**
* Build the {@link KafkaSource}.
*
@@ -422,7 +428,8 @@ public class KafkaSourceBuilder<OUT> {
boundedness,
deserializationSchema,
metricSchema,
- props);
+ props,
+ enableLogReport);
}
// ------------- private helpers --------------
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
index 4643887c49..9474269a8a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.kafka.source.reader;
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
import org.apache.flink.annotation.Internal;
@@ -37,6 +38,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +68,8 @@ public class KafkaSourceReader<T>
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
private final boolean commitOffsetsOnCheckpoint;
private final KafkaDeserializationSchema<RowData> metricSchema;
+ private OpenTelemetryLogger openTelemetryLogger;
+ private final boolean enableLogReport;
public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[],
byte[]>>> elementsQueue,
@@ -74,7 +78,8 @@ public class KafkaSourceReader<T>
Configuration config,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
- KafkaDeserializationSchema<RowData> metricSchema) {
+ KafkaDeserializationSchema<RowData> metricSchema,
+ boolean enableLogReport) {
super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config,
context);
this.offsetsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
@@ -82,11 +87,34 @@ public class KafkaSourceReader<T>
this.commitOffsetsOnCheckpoint =
config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
this.metricSchema = metricSchema;
+ this.enableLogReport = enableLogReport;
if (!commitOffsetsOnCheckpoint) {
LOG.warn(
"Offset commit on checkpoint is disabled. "
+ "Consuming offset will not be reported back to
Kafka cluster.");
}
+ if (this.enableLogReport) {
+ this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+ .setLogLevel(Level.ERROR)
+ .setServiceName(this.getClass().getSimpleName())
+ .setLocalHostIp(this.context.getLocalHostName()).build();
+ }
+ }
+
+ @Override
+ public void start() {
+ if (this.enableLogReport) {
+ this.openTelemetryLogger.install();
+ }
+ super.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (this.enableLogReport) {
+ openTelemetryLogger.uninstall();
+ }
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 9b0b0aff64..ef7e34778c 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -163,6 +163,8 @@ public class KafkaDynamicSource
private final MetricOption metricOption;
+ private final boolean enableLogReport;
+
public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
@@ -178,7 +180,8 @@ public class KafkaDynamicSource
long startupTimestampMillis,
boolean upsertMode,
String tableIdentifier,
- MetricOption metricOption) {
+ MetricOption metricOption,
+ boolean enableLogReport) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
@@ -213,6 +216,7 @@ public class KafkaDynamicSource
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.metricOption = metricOption;
+ this.enableLogReport = enableLogReport;
}
@Override
@@ -328,7 +332,8 @@ public class KafkaDynamicSource
startupTimestampMillis,
upsertMode,
tableIdentifier,
- metricOption);
+ metricOption,
+ enableLogReport);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
@@ -366,7 +371,8 @@ public class KafkaDynamicSource
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
- && Objects.equals(metricOption, that.metricOption);
+ && Objects.equals(metricOption, that.metricOption)
+ && Objects.equals(enableLogReport, that.enableLogReport);
}
@Override
@@ -389,7 +395,8 @@ public class KafkaDynamicSource
upsertMode,
tableIdentifier,
watermarkStrategy,
- metricOption);
+ metricOption,
+ enableLogReport);
}
//
--------------------------------------------------------------------------------------------
@@ -443,7 +450,8 @@ public class KafkaDynamicSource
kafkaSourceBuilder
.setProperties(properties)
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer))
- .setMetricSchema(kafkaDeserializer);
+ .setMetricSchema(kafkaDeserializer)
+ .enableLogReport(enableLogReport);
return kafkaSourceBuilder.build();
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 3a320b7f88..1070039174 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -200,6 +200,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
String auditKeys = tableOptions.get(AUDIT_KEYS);
+ Boolean enableLogReport =
context.getConfiguration().get(ENABLE_LOG_REPORT);
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
@@ -220,7 +221,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
context.getObjectIdentifier().asSummaryString(),
- metricOption);
+ metricOption,
+ enableLogReport);
}
@Override
@@ -387,7 +389,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
String tableIdentifier,
- MetricOption metricOption) {
+ MetricOption metricOption,
+ boolean enableLogReport) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
@@ -403,7 +406,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
startupTimestampMillis,
false,
tableIdentifier,
- metricOption);
+ metricOption,
+ enableLogReport);
}
protected KafkaDynamicSink createKafkaTableSink(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6a8f5e20c9..2173ee5718 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -133,7 +133,7 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
String auditKeys = tableOptions.get(AUDIT_KEYS);
-
+ Boolean enableLogReport =
context.getConfiguration().get(ENABLE_LOG_REPORT);
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
@@ -155,7 +155,8 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
0,
true,
context.getObjectIdentifier().asSummaryString(),
- metricOption);
+ metricOption,
+ enableLogReport);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
index 3e080b5b8b..687d8f4f0f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml
@@ -95,6 +95,8 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
+ <include>io.opentelemetry*</include>
+ <include>com.squareup.*</include>
<include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
index 55aec6ed1c..3273f3ee2d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
@@ -86,6 +86,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final String chunkKeyColumn;
+ private final boolean enableLogReport;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
@@ -121,6 +122,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
+ boolean enableLogReport,
Properties jdbcProperties,
Duration heartbeatInterval,
@Nullable String chunkKeyColumn,
@@ -136,6 +138,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.enableParallelRead = enableParallelRead;
+ this.enableLogReport = enableLogReport;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
this.fetchSize = fetchSize;
@@ -206,6 +209,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+ .enableLogReport(enableLogReport)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.chunkKeyColumn(chunkKeyColumn)
@@ -285,6 +289,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
+ enableLogReport,
jdbcProperties,
heartbeatInterval,
chunkKeyColumn,
@@ -311,6 +316,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
&& distributionFactorUpper == that.distributionFactorUpper
&& distributionFactorLower == that.distributionFactorLower
&& scanNewlyAddedTableEnabled ==
that.scanNewlyAddedTableEnabled
+ && enableLogReport == that.enableLogReport
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
@@ -358,6 +364,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled,
+ enableLogReport,
jdbcProperties,
heartbeatInterval,
chunkKeyColumn,
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index f903780a36..0dc854dfa2 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.mysql;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
@@ -69,6 +70,7 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
final String username = config.get(USERNAME);
final String password = config.get(PASSWORD);
final String databaseName = config.get(DATABASE_NAME);
+ final Boolean enableLogReport =
context.getConfiguration().get(Constants.ENABLE_LOG_REPORT);
validateRegex(DATABASE_NAME.key(), databaseName);
final String tableName = config.get(TABLE_NAME);
validateRegex(TABLE_NAME.key(), tableName);
@@ -129,6 +131,7 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
+ enableLogReport,
getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
chunkKeyColumn,
@@ -337,7 +340,6 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
+ "\"-U\" represents UPDATE_BEFORE.\n"
+ "\"+U\" represents UPDATE_AFTER.\n"
+ "\"-D\" represents DELETE.");
-
//
----------------------------------------------------------------------------
// experimental options, won't add them to documentation
//
----------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
index da1dcbadae..3d4af2caae 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
@@ -103,6 +103,7 @@ public class MySqlSource<T>
private final MySqlSourceConfigFactory configFactory;
private final DebeziumDeserializationSchema<T> deserializationSchema;
+ private final boolean enableLogReport;
/**
* Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}.
@@ -116,9 +117,11 @@ public class MySqlSource<T>
MySqlSource(
MySqlSourceConfigFactory configFactory,
- DebeziumDeserializationSchema<T> deserializationSchema) {
+ DebeziumDeserializationSchema<T> deserializationSchema,
+ boolean enableLogReport) {
this.configFactory = configFactory;
this.deserializationSchema = deserializationSchema;
+ this.enableLogReport = enableLogReport;
}
public MySqlSourceConfigFactory getConfigFactory() {
@@ -168,7 +171,8 @@ public class MySqlSource<T>
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig,
- deserializationSchema);
+ deserializationSchema,
+ enableLogReport);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
index 775da14035..3b4b7ff811 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
@@ -51,6 +51,7 @@ public class MySqlSourceBuilder<T> {
private final MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;
+ private boolean enableLogReport;
public MySqlSourceBuilder<T> hostname(String hostname) {
this.configFactory.hostname(hostname);
@@ -235,12 +236,17 @@ public class MySqlSourceBuilder<T> {
return this;
}
+ public MySqlSourceBuilder<T> enableLogReport(boolean enableLogReport) {
+ this.enableLogReport = enableLogReport;
+ return this;
+ }
+
/**
* Build the MySqlSource
*
* @return a MySqlParallelSource with the settings made for this builder.
*/
public MySqlSource<T> build() {
- return new MySqlSource<>(configFactory, checkNotNull(deserializer));
+ return new MySqlSource<>(configFactory, checkNotNull(deserializer),
enableLogReport);
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
index 01f34f28b1..6fbd7b96ba 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.mysql.source.reader;
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
@@ -55,6 +56,7 @@ import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +90,8 @@ public class MySqlSourceReader<T>
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedBinlogSplit;
private final DebeziumDeserializationSchema<T> metricSchema;
+ private OpenTelemetryLogger openTelemetryLogger;
+ private final boolean enableLogReport;
public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>
elementQueue,
@@ -95,7 +99,8 @@ public class MySqlSourceReader<T>
RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter,
Configuration config,
MySqlSourceReaderContext context,
- MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T>
metricSchema) {
+ MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T>
metricSchema,
+ boolean enableLogReport) {
super(
elementQueue,
new SingleThreadFetcherManager<>(elementQueue,
splitReaderSupplier::get),
@@ -109,15 +114,33 @@ public class MySqlSourceReader<T>
this.mySqlSourceReaderContext = context;
this.suspendedBinlogSplit = null;
this.metricSchema = metricSchema;
+ this.enableLogReport = enableLogReport;
+ if (enableLogReport) {
+ this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+ .setLogLevel(Level.ERROR)
+ .setServiceName(this.getClass().getSimpleName())
+ .setLocalHostIp(this.context.getLocalHostName()).build();
+ }
}
@Override
public void start() {
+ if (enableLogReport) {
+ openTelemetryLogger.install();
+ }
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (enableLogReport) {
+ openTelemetryLogger.uninstall();
+ }
+ }
+
@Override
protected MySqlSplitState initializedState(MySqlSplit split) {
if (split.isSnapshotSplit()) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 3e6a1b3ee2..97cc31232e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -242,6 +242,8 @@
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<artifactSet>
<includes>
+ <include>io.opentelemetry*</include>
+ <include>com.squareup.*</include>
<include>org.apache.inlong:*</include>
<include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
<include>io.streamnative.connectors:flink-protobuf</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 780dfde8b5..7fa2a4cddc 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -48,9 +48,7 @@ import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
-import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.*;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat;
@@ -125,7 +123,7 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
final StartCursor startCursor = getStartCursor(tableOptions);
final StopCursor stopCursor = getStopCursor(tableOptions);
final SubscriptionType subscriptionType =
getSubscriptionType(tableOptions);
-
+ final boolean enableLogReport =
context.getConfiguration().get(ENABLE_LOG_REPORT);
// Forward source configs
final Properties properties = getPulsarProperties(tableOptions);
properties.setProperty(PULSAR_ADMIN_URL.key(),
tableOptions.get(ADMIN_URL));
@@ -163,7 +161,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
final DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForMetadataPushdown =
valueDecodingFormat;
final ChangelogMode changelogMode =
decodingFormatForMetadataPushdown.getChangelogMode();
-
return new PulsarTableSource(
deserializationSchemaFactory,
decodingFormatForMetadataPushdown,
@@ -172,7 +169,8 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
properties,
startCursor,
stopCursor,
- subscriptionType);
+ subscriptionType,
+ enableLogReport);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
index b7b608ec82..756d2cb1a8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
@@ -91,6 +91,8 @@ public final class PulsarSource<OUT>
/** The pulsar deserialization schema used for deserializing message. */
private final PulsarDeserializationSchema<OUT> deserializationSchema;
+ private final boolean enableLogReport;
+
/**
* The constructor for PulsarSource, it's package protected for forcing
using {@link
* PulsarSourceBuilder}.
@@ -102,7 +104,8 @@ public final class PulsarSource<OUT>
StartCursor startCursor,
StopCursor stopCursor,
Boundedness boundedness,
- PulsarDeserializationSchema<OUT> deserializationSchema) {
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ boolean enableLogReport) {
this.sourceConfiguration = sourceConfiguration;
this.subscriber = subscriber;
this.rangeGenerator = rangeGenerator;
@@ -110,6 +113,7 @@ public final class PulsarSource<OUT>
this.stopCursor = stopCursor;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
+ this.enableLogReport = enableLogReport;
}
/**
@@ -136,7 +140,7 @@ public final class PulsarSource<OUT>
deserializationSchema.open(initializationContext, sourceConfiguration);
return PulsarSourceReaderFactory.create(
- readerContext, deserializationSchema, sourceConfiguration);
+ readerContext, deserializationSchema, sourceConfiguration,
enableLogReport);
}
@Internal
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
index d8c05ce152..f783aa3bcf 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
@@ -127,6 +127,7 @@ public final class PulsarSourceBuilder<OUT> {
private StopCursor stopCursor;
private Boundedness boundedness;
private PulsarDeserializationSchema<OUT> deserializationSchema;
+ private boolean enableLogReport;
// private builder constructor.
PulsarSourceBuilder() {
@@ -412,6 +413,11 @@ public final class PulsarSourceBuilder<OUT> {
return this;
}
+ public PulsarSourceBuilder<OUT> enableLogReport(boolean enableLogReport) {
+ this.enableLogReport = enableLogReport;
+ return this;
+ }
+
/**
* Build the {@link PulsarSource}.
*
@@ -498,7 +504,8 @@ public final class PulsarSourceBuilder<OUT> {
startCursor,
stopCursor,
boundedness,
- deserializationSchema);
+ deserializationSchema,
+ enableLogReport);
}
// ------------- private helpers --------------
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
index bbc42c149f..3bccfda1f3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
@@ -64,7 +64,8 @@ public final class PulsarSourceReaderFactory {
public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(
SourceReaderContext readerContext,
PulsarDeserializationSchema<OUT> deserializationSchema,
- SourceConfiguration sourceConfiguration) {
+ SourceConfiguration sourceConfiguration,
+ boolean enableLogReport) {
PulsarClient pulsarClient = createClient(sourceConfiguration);
PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
@@ -93,7 +94,8 @@ public final class PulsarSourceReaderFactory {
sourceConfiguration,
pulsarClient,
pulsarAdmin,
- deserializationSchema);
+ deserializationSchema,
+ enableLogReport);
} else if (subscriptionType == SubscriptionType.Shared
|| subscriptionType == SubscriptionType.Key_Shared) {
TransactionCoordinatorClient coordinatorClient =
@@ -119,7 +121,8 @@ public final class PulsarSourceReaderFactory {
pulsarClient,
pulsarAdmin,
coordinatorClient,
- deserializationSchema);
+ deserializationSchema,
+ enableLogReport);
} else {
throw new UnsupportedOperationException(
"This subscription type is not " + subscriptionType + "
supported currently.");
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
index 3c75793f93..75ac35c025 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
@@ -83,14 +83,16 @@ public class PulsarOrderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT>
SourceConfiguration sourceConfiguration,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
- PulsarDeserializationSchema<OUT> deserializationSchema) {
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ boolean enableLogReport) {
super(
elementsQueue,
new PulsarOrderedFetcherManager<>(elementsQueue,
splitReaderSupplier::get),
context,
sourceConfiguration,
pulsarClient,
- pulsarAdmin);
+ pulsarAdmin,
+ enableLogReport);
this.cursorsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
index f7e6bafc1d..3d9bc6a517 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
+
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
@@ -44,6 +46,8 @@ abstract class PulsarSourceReaderBase<OUT>
protected final SourceConfiguration sourceConfiguration;
protected final PulsarClient pulsarClient;
protected final PulsarAdmin pulsarAdmin;
+ private OpenTelemetryLogger openTelemetryLogger;
+ protected final boolean enableLogReport;
protected PulsarSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>>
elementsQueue,
@@ -51,7 +55,8 @@ abstract class PulsarSourceReaderBase<OUT>
SourceReaderContext context,
SourceConfiguration sourceConfiguration,
PulsarClient pulsarClient,
- PulsarAdmin pulsarAdmin) {
+ PulsarAdmin pulsarAdmin,
+ boolean enableLogReport) {
super(
elementsQueue,
splitFetcherManager,
@@ -62,6 +67,13 @@ abstract class PulsarSourceReaderBase<OUT>
this.sourceConfiguration = sourceConfiguration;
this.pulsarClient = pulsarClient;
this.pulsarAdmin = pulsarAdmin;
+ this.enableLogReport = enableLogReport;
+ if (enableLogReport) {
+ this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
+ .setLogLevel(org.apache.logging.log4j.Level.ERROR)
+ .setServiceName(this.getClass().getSimpleName())
+ .setLocalHostIp(this.context.getLocalHostName()).build();
+ }
}
@Override
@@ -75,6 +87,14 @@ abstract class PulsarSourceReaderBase<OUT>
return splitState.toPulsarPartitionSplit();
}
+ @Override
+ public void start() {
+ if (enableLogReport) {
+ this.openTelemetryLogger.install();
+ }
+ super.start();
+ }
+
@Override
public void close() throws Exception {
// Close the all the consumers first.
@@ -83,5 +103,8 @@ abstract class PulsarSourceReaderBase<OUT>
// Close shared pulsar resources.
pulsarClient.shutdown();
pulsarAdmin.close();
+ if (enableLogReport) {
+ openTelemetryLogger.uninstall();
+ }
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
index adf15de0b1..f30d7b3477 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
@@ -81,14 +81,16 @@ public class PulsarUnorderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
@Nullable TransactionCoordinatorClient coordinatorClient,
- PulsarDeserializationSchema<OUT> deserializationSchema) {
+ PulsarDeserializationSchema<OUT> deserializationSchema,
+ boolean enableLogReport) {
super(
elementsQueue,
new PulsarUnorderedFetcherManager<>(elementsQueue,
splitReaderSupplier::get),
context,
sourceConfiguration,
pulsarClient,
- pulsarAdmin);
+ pulsarAdmin,
+ enableLogReport);
this.coordinatorClient = coordinatorClient;
this.transactionsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.transactionsOfFinishedSplits = Collections.synchronizedList(new
ArrayList<>());
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index e5df548650..cd73a7bfce 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -86,6 +86,8 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
private final SubscriptionType subscriptionType;
+ private final boolean enableLogReport;
+
public PulsarTableSource(
PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory,
DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForReadingMetadata,
@@ -94,7 +96,8 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
Properties properties,
StartCursor startCursor,
StopCursor stopCursor,
- SubscriptionType subscriptionType) {
+ SubscriptionType subscriptionType,
+ boolean enableLogReport) {
// Format attributes
this.deserializationSchemaFactory =
checkNotNull(deserializationSchemaFactory);
this.decodingFormatForReadingMetadata =
checkNotNull(decodingFormatForReadingMetadata);
@@ -105,6 +108,7 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
this.startCursor = checkNotNull(startCursor);
this.stopCursor = checkNotNull(stopCursor);
this.subscriptionType = checkNotNull(subscriptionType);
+ this.enableLogReport = enableLogReport;
}
@Override
@@ -127,6 +131,7 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
// only support exclusive since shared mode requires
pulsar with transaction enabled
// and supporting transaction consumes more resources
in pulsar broker
.setSubscriptionType(SubscriptionType.Exclusive)
+ .enableLogReport(enableLogReport)
.build();
return SourceProvider.of(source);
}
@@ -194,7 +199,8 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
properties,
startCursor,
stopCursor,
- subscriptionType);
+ subscriptionType,
+ enableLogReport);
}
@Override
@@ -215,7 +221,8 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(properties, that.properties)
&& Objects.equals(startCursor, that.startCursor)
&& Objects.equals(stopCursor, that.stopCursor)
- && subscriptionType == that.subscriptionType;
+ && subscriptionType == that.subscriptionType
+ && Objects.equals(enableLogReport, that.enableLogReport);
}
@Override
@@ -228,6 +235,7 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
properties,
startCursor,
stopCursor,
- subscriptionType);
+ subscriptionType,
+ enableLogReport);
}
}