This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 133fd52216 [INLONG-11720][SDK] Optimize MsgSenderSingleFactory
implementation (#11721)
133fd52216 is described below
commit 133fd52216f5fec055035b809440589b1a139d69
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Feb 7 18:27:21 2025 +0800
[INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation (#11721)
* [INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation
* [INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 1 -
.../inlong/sdk/dataproxy/MsgSenderFactory.java | 2 +-
.../sdk/dataproxy/MsgSenderMultiFactory.java | 6 ++---
.../sdk/dataproxy/MsgSenderSingleFactory.java | 28 +++++++++++++++-------
4 files changed, 23 insertions(+), 14 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 923ebd2b9f..72c1a6ac67 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -67,7 +67,6 @@ public class BaseMsgSenderFactory {
public void close() {
int totalSenderCnt;
- int totalTDBankCnt;
logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
senderCacheLock.writeLock().lock();
try {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index d2169d7152..7fbae90964 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -37,7 +37,7 @@ public interface MsgSenderFactory {
* Shutdown all senders at the factory
*
*/
- void shutdownAll() throws ProxySdkException;
+ void shutdownAll();
/**
* Remove the specified sender from the factory
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index f42d8b4d61..bcf32defd3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -46,9 +46,9 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
}
@Override
- public void shutdownAll() throws ProxySdkException {
- if (!this.initialized.get()) {
- throw new ProxySdkException("Please initialize the factory
first!");
+ public void shutdownAll() {
+ if (!this.initialized.compareAndSet(true, false)) {
+ return;
}
this.baseMsgSenderFactory.close();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index ad891c971b..3ca9821c43 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -36,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class MsgSenderSingleFactory implements MsgSenderFactory {
+ private static final AtomicLong refCounter = new AtomicLong(0);
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private static final AtomicLong singletonRefCounter = new AtomicLong(0);
private static BaseMsgSenderFactory baseMsgSenderFactory;
public MsgSenderSingleFactory() {
- if (singletonRefCounter.incrementAndGet() == 1) {
- baseMsgSenderFactory = new BaseMsgSenderFactory(this,
"iSingleFct");
- initialized.set(true);
+ synchronized (singletonRefCounter) {
+ if (singletonRefCounter.incrementAndGet() == 1) {
+ baseMsgSenderFactory = new BaseMsgSenderFactory(
+ this, "iSingleFct-" + refCounter.incrementAndGet());
+ initialized.set(true);
+ }
}
while (!initialized.get()) {
ProxyUtils.sleepSomeTime(50L);
@@ -51,14 +55,20 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
}
@Override
- public void shutdownAll() throws ProxySdkException {
- if (!initialized.get()) {
- throw new ProxySdkException("Please initialize the factory
first!");
+ public void shutdownAll() {
+ BaseMsgSenderFactory tmpFactory;
+ synchronized (singletonRefCounter) {
+ if (!initialized.get()
+ || singletonRefCounter.decrementAndGet() > 0) {
+ return;
+ }
+ tmpFactory = baseMsgSenderFactory;
+ baseMsgSenderFactory = null;
+ initialized.set(false);
}
- if (singletonRefCounter.decrementAndGet() > 0) {
- return;
+ if (tmpFactory != null) {
+ tmpFactory.close();
}
- baseMsgSenderFactory.close();
}
@Override