This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f0a79bcb36 [INLONG-11629][SDK] Adjust the Sender initialization logic
(#11630)
f0a79bcb36 is described below
commit f0a79bcb360675a0c290740ed89d528304c3e78f
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jan 2 15:33:48 2025 +0800
[INLONG-11629][SDK] Adjust the Sender initialization logic (#11630)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/network/Sender.java | 32 +++++++++++-----------
1 file changed, 16 insertions(+), 16 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index b56781c0ce..eb320b4a2a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -82,15 +82,28 @@ public class Sender {
this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize();
this.threadPool = Executors.newCachedThreadPool();
this.clientMgr = new ClientMgr(configure, this, selfDefineFactory);
+ this.scanThread = new TimeoutScanThread(this, configure);
+ if (configure.isEnableMetric()) {
+ metricWorker = new MetricWorkerThread(configure, this);
+ }
+ logger.info("Sender({}) instance initialized!", this.instanceId);
+ }
+
+ public void start() throws Exception {
+ if (!started.compareAndSet(false, true)) {
+ return;
+ }
+ this.clientMgr.start();
+ this.scanThread.start();
ProxyConfigEntry proxyConfigEntry;
try {
proxyConfigEntry = this.clientMgr.getGroupIdConfigure();
setClusterId(proxyConfigEntry.getClusterId());
- } catch (Throwable e) {
+ } catch (Throwable ex) {
if (configure.isOnlyUseLocalProxyConfig()) {
- throw new Exception("Get local proxy configure failure!",
e.getCause());
+ throw new Exception("Get local proxy configure failure!", ex);
} else {
- throw new Exception("Visit manager error!", e.getCause());
+ throw new Exception("Visit manager error!", ex);
}
}
if (!proxyConfigEntry.isInterVisit()) {
@@ -101,19 +114,6 @@ public class Sender {
throw new Exception("In OutNetwork isNeedDataEncry must be
true!");
}
}
- scanThread = new TimeoutScanThread(this, configure);
- if (configure.isEnableMetric()) {
- metricWorker = new MetricWorkerThread(configure, this);
- }
- logger.info("Sender({}) instance initialized!", this.instanceId);
- }
-
- public void start() {
- if (!started.compareAndSet(false, true)) {
- return;
- }
- this.clientMgr.start();
- this.scanThread.start();
if (this.configure.isEnableMetric()) {
this.metricWorker.start();
}