This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 133fd52216 [INLONG-11720][SDK] Optimize MsgSenderSingleFactory 
implementation (#11721)
133fd52216 is described below

commit 133fd52216f5fec055035b809440589b1a139d69
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Feb 7 18:27:21 2025 +0800

    [INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation (#11721)
    
    * [INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation
    
    * [INLONG-11720][SDK] Optimize MsgSenderSingleFactory implementation
    
    ---------
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java |  1 -
 .../inlong/sdk/dataproxy/MsgSenderFactory.java     |  2 +-
 .../sdk/dataproxy/MsgSenderMultiFactory.java       |  6 ++---
 .../sdk/dataproxy/MsgSenderSingleFactory.java      | 28 +++++++++++++++-------
 4 files changed, 23 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 923ebd2b9f..72c1a6ac67 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -67,7 +67,6 @@ public class BaseMsgSenderFactory {
 
     public void close() {
         int totalSenderCnt;
-        int totalTDBankCnt;
         logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
         senderCacheLock.writeLock().lock();
         try {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index d2169d7152..7fbae90964 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -37,7 +37,7 @@ public interface MsgSenderFactory {
      * Shutdown all senders at the factory
      *
      */
-    void shutdownAll() throws ProxySdkException;
+    void shutdownAll();
 
     /**
      * Remove the specified sender from the factory
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index f42d8b4d61..bcf32defd3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -46,9 +46,9 @@ public class MsgSenderMultiFactory implements 
MsgSenderFactory {
     }
 
     @Override
-    public void shutdownAll() throws ProxySdkException {
-        if (!this.initialized.get()) {
-            throw new ProxySdkException("Please initialize the factory 
first!");
+    public void shutdownAll() {
+        if (!this.initialized.compareAndSet(true, false)) {
+            return;
         }
         this.baseMsgSenderFactory.close();
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index ad891c971b..3ca9821c43 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -36,14 +36,18 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class MsgSenderSingleFactory implements MsgSenderFactory {
 
+    private static final AtomicLong refCounter = new AtomicLong(0);
     private static final AtomicBoolean initialized = new AtomicBoolean(false);
     private static final AtomicLong singletonRefCounter = new AtomicLong(0);
     private static BaseMsgSenderFactory baseMsgSenderFactory;
 
     public MsgSenderSingleFactory() {
-        if (singletonRefCounter.incrementAndGet() == 1) {
-            baseMsgSenderFactory = new BaseMsgSenderFactory(this, 
"iSingleFct");
-            initialized.set(true);
+        synchronized (singletonRefCounter) {
+            if (singletonRefCounter.incrementAndGet() == 1) {
+                baseMsgSenderFactory = new BaseMsgSenderFactory(
+                        this, "iSingleFct-" + refCounter.incrementAndGet());
+                initialized.set(true);
+            }
         }
         while (!initialized.get()) {
             ProxyUtils.sleepSomeTime(50L);
@@ -51,14 +55,20 @@ public class MsgSenderSingleFactory implements 
MsgSenderFactory {
     }
 
     @Override
-    public void shutdownAll() throws ProxySdkException {
-        if (!initialized.get()) {
-            throw new ProxySdkException("Please initialize the factory 
first!");
+    public void shutdownAll() {
+        BaseMsgSenderFactory tmpFactory;
+        synchronized (singletonRefCounter) {
+            if (!initialized.get()
+                    || singletonRefCounter.decrementAndGet() > 0) {
+                return;
+            }
+            tmpFactory = baseMsgSenderFactory;
+            baseMsgSenderFactory = null;
+            initialized.set(false);
         }
-        if (singletonRefCounter.decrementAndGet() > 0) {
-            return;
+        if (tmpFactory != null) {
+            tmpFactory.close();
         }
-        baseMsgSenderFactory.close();
     }
 
     @Override

Reply via email to