This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d66bd13904 [INLONG-11683][SDK] Optimize the functions return of the 
ProxyConfigManager class (#11684)
d66bd13904 is described below

commit d66bd139040a4e1c1f041e02c50885da169ec6cb
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jan 20 09:45:21 2025 +0800

    [INLONG-11683][SDK] Optimize the functions return of the ProxyConfigManager 
class (#11684)
    
    * [INLONG-11683][SDK] Optimize the functions return of the 
ProxyConfigManager class
    
    * [INLONG-11683][SDK] Optimize the functions return of the 
ProxyConfigManager class
    
    ---------
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |  18 +-
 .../inlong/sdk/dataproxy/common/ErrorCode.java     |  96 +++++
 .../inlong/sdk/dataproxy/common/ProcessResult.java | 106 ++++++
 .../inlong/sdk/dataproxy/common/SdkConsts.java     |   2 -
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 395 +++++++++++----------
 .../inlong/sdk/dataproxy/network/ClientMgr.java    |  21 +-
 .../sdk/dataproxy/network/HttpProxySender.java     |  24 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  20 ++
 .../sdk/dataproxy/ProxyConfigManagerTest.java      |   7 +-
 inlong-sdk/pom.xml                                 |   2 +-
 10 files changed, 467 insertions(+), 224 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index b887c399e5..290b856ebe 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.dataproxy;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.util.MessageUtils;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -30,7 +31,6 @@ import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.SequentialID;
 import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,20 +102,20 @@ public class DefaultMessageSender implements 
MessageSender {
             ThreadFactory selfDefineFactory) throws Exception {
         LOGGER.info("Initial tcp sender, configure is {}", tcpConfig);
         // initial sender object
+        ProcessResult procResult = new ProcessResult();
         ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(tcpConfig);
-        Tuple2<ProxyConfigEntry, String> result =
-                proxyConfigManager.getGroupIdConfigure(true);
-        if (result.getF0() == null) {
-            throw new Exception(result.getF1());
+        if (!proxyConfigManager.getGroupIdConfigure(true, procResult)) {
+            throw new Exception(procResult.toString());
         }
-        DefaultMessageSender sender = 
CACHE_SENDER.get(result.getF0().getClusterId());
+        ProxyConfigEntry configEntry = (ProxyConfigEntry) 
procResult.getRetData();
+        DefaultMessageSender sender = 
CACHE_SENDER.get(configEntry.getClusterId());
         if (sender != null) {
             return sender;
         } else {
             DefaultMessageSender tmpMessageSender =
                     new DefaultMessageSender(tcpConfig, selfDefineFactory);
-            
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
-            CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
+            
tmpMessageSender.setMaxPacketLength(configEntry.getMaxPacketLength());
+            CACHE_SENDER.put(configEntry.getClusterId(), tmpMessageSender);
             return tmpMessageSender;
         }
     }
@@ -209,7 +209,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public String getSDKVersion() {
-        return SdkConsts.PROXY_SDK_VERSION;
+        return ProxyUtils.getJarVersion();
     }
 
     private SendResult attemptSendMessage(Function<Sender, SendResult> 
sendOperation) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
new file mode 100644
index 0000000000..5f4cd0cdc8
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+/**
+ * Error Code class
+ *
+ * Used to identify different types of errors
+ */
+public enum ErrorCode {
+
+    OK(0, "Ok"),
+
+    SDK_CLOSED(11, "SDK service closed"),
+    //
+    ILLEGAL_CALL_STATE(21, "Only allowed for meta query"),
+    CONFIGURE_NOT_INITIALIZED(22, "Configure not initialized"),
+    FREQUENT_RMT_FAILURE_VISIT(23, "Frequent manager failure visit"),
+
+    // file visit
+    LOCAL_FILE_NOT_EXIST(31, "Local file not exist"),
+    LOCAL_FILE_EXPIRED(32, "Local file expired"),
+    READ_LOCAL_FILE_FAILURE(33, "Read local file failure"),
+    BLANK_FILE_CONTENT(34, "Blank file content"),
+    PARSE_FILE_CONTENT_FAILURE(35, "Parse file content failure"),
+    //
+    PARSE_FILE_CONTENT_IS_NULL(36, "Parse file content is null"),
+
+    // remote visit
+    BUILD_HTTP_CLIENT_EXCEPTION(41, "Build http client exception"),
+    HTTP_VISIT_EXCEPTION(42, "Visit http server exception"),
+    RMT_RETURN_FAILURE(43, "Http server return failure"),
+    RMT_RETURN_BLANK_CONTENT(44, "Http server return blank content"),
+    PARSE_RMT_CONTENT_FAILURE(45, "Parse manager content failure"),
+    //
+    PARSE_RMT_CONTENT_IS_NULL(46, "Parse manager content is null"),
+    RMT_RETURN_ERROR(47, "Manager return error info"),
+    META_FIELD_DATA_IS_NULL(48, "Field data is null"),
+    META_NODE_LIST_IS_EMPTY(49, "Field nodeList is empty"),
+    NODE_LIST_RECORD_INVALID(50, "No valid nodeList records"),
+    //
+    PARSE_PROXY_META_EXCEPTION(51, "No valid nodeList records"),
+    PARSE_ENCRYPT_META_EXCEPTION(52, "Parse encrypt content failure"),
+    META_REQUIRED_FIELD_NOT_EXIST(53, "Required meta field not exist"),
+    META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
+
+    //
+    UNKNOWN_ERROR(9999, "Unknown error");
+
+    public static ErrorCode valueOf(int value) {
+        for (ErrorCode errCode : ErrorCode.values()) {
+            if (errCode.getErrCode() == value) {
+                return errCode;
+            }
+        }
+        return UNKNOWN_ERROR;
+    }
+
+    public static String getErrMsg(String errCode) {
+        int codeVal = NumberUtils.toInt(errCode, Integer.MAX_VALUE);
+        return valueOf(codeVal).errMsg;
+    }
+
+    private final int errCode;
+    private final String errMsg;
+
+    ErrorCode(int errCode, String errMsg) {
+        this.errCode = errCode;
+        this.errMsg = errMsg;
+    }
+
+    public int getErrCode() {
+        return errCode;
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
new file mode 100644
index 0000000000..c3763e9c08
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProcessResult.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.common;
+
+/**
+ * Process Result class
+ *
+ * Used to return the result of task processing
+ */
+public class ProcessResult {
+
+    // error code
+    private int errCode = ErrorCode.UNKNOWN_ERROR.getErrCode();
+    // error message
+    private String errMsg = "";
+    // return data if success
+    private Object retData = null;
+
+    public ProcessResult() {
+        //
+    }
+
+    public ProcessResult(ErrorCode errCode) {
+        this.errCode = errCode.getErrCode();
+    }
+
+    public ProcessResult(ErrorCode errCode, String errMsg) {
+        this.errCode = errCode.getErrCode();
+        this.errMsg = errMsg;
+    }
+
+    public boolean setSuccess() {
+        this.errCode = ErrorCode.OK.getErrCode();
+        this.errMsg = "";
+        this.retData = null;
+        return isSuccess();
+    }
+
+    public boolean setSuccess(Object retData) {
+        this.errCode = ErrorCode.OK.getErrCode();
+        this.errMsg = "";
+        this.retData = retData;
+        return isSuccess();
+    }
+
+    public boolean setFailResult(ProcessResult other) {
+        this.errCode = other.getErrCode();
+        this.errMsg = other.getErrMsg();
+        this.retData = other.getRetData();
+        return isSuccess();
+    }
+
+    public boolean setFailResult(ErrorCode errCode) {
+        this.errCode = errCode.getErrCode();
+        this.errMsg = "";
+        this.retData = null;
+        return isSuccess();
+    }
+
+    public boolean setFailResult(ErrorCode errCode, String errMsg) {
+        this.errCode = errCode.getErrCode();
+        this.errMsg = errMsg;
+        this.retData = null;
+        return isSuccess();
+    }
+
+    public boolean isSuccess() {
+        return (this.errCode == ErrorCode.OK.getErrCode());
+    }
+
+    public int getErrCode() {
+        return errCode;
+    }
+
+    public String getErrMsg() {
+        return errMsg;
+    }
+
+    public Object getRetData() {
+        return retData;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("ProcessResult{");
+        sb.append("errCode=").append(errCode);
+        sb.append(", errMsg='").append(errMsg).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 8ae882e21b..1449a5f0d8 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -19,8 +19,6 @@ package org.apache.inlong.sdk.dataproxy.common;
 
 public class SdkConsts {
 
-    public static final String PROXY_SDK_VERSION = "1.2.11";
-
     public static String PREFIX_HTTP = "http://";;
     public static String PREFIX_HTTPS = "https://";;
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 36c8f3ec46..b587bfff0d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sdk.dataproxy.config;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.common.util.BasicAuth;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
@@ -108,6 +110,7 @@ public class ProxyConfigManager extends Thread {
     private String proxyConfigVisitUrl;
     private String proxyQueryFailKey;
     private String proxyConfigCacheFile;
+    private ProxyConfigEntry proxyConfigEntry = null;
     private List<HostInfo> proxyInfoList = new ArrayList<>();
     private int oldStat = 0;
     private String localMd5;
@@ -141,18 +144,15 @@ public class ProxyConfigManager extends Thread {
      * @param configure  proxy client configure
      * @return process result
      */
-    public Tuple2<Boolean, String> updProxyClientConfig(ProxyClientConfig 
configure) {
+    public boolean updProxyClientConfig(ProxyClientConfig configure, 
ProcessResult procResult) {
         if (this.shutDown.get()) {
-            return new Tuple2<>(false, "SDK has shutdown!");
-        }
-        if (configure == null) {
-            return new Tuple2<>(false, "ProxyClientConfig is null");
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
         if (this.clientManager != null) {
-            return new Tuple2<>(false, "Not allowed for non meta-query case!");
+            return procResult.setFailResult(ErrorCode.ILLEGAL_CALL_STATE);
         }
         this.storeAndBuildMetaConfigure(configure);
-        return new Tuple2<>(true, "OK");
+        return procResult.setSuccess();
     }
 
     public void shutDown() {
@@ -167,123 +167,95 @@ public class ProxyConfigManager extends Thread {
     }
 
     /**
-     * get groupId config
+     * query groupId configure from remote manager
      *
      * @return proxyConfigEntry
-     * @throws Exception ex
      */
-    public Tuple2<ProxyConfigEntry, String> getGroupIdConfigure(boolean 
needRetry) throws Exception {
+    public boolean getGroupIdConfigure(boolean needRetry, ProcessResult 
procResult) {
         if (shutDown.get()) {
-            return new Tuple2<>(null, "SDK has shutdown!");
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
         if (mgrConfig == null) {
-            return new Tuple2<>(null, "Configure not initialized!");
+            return 
procResult.setFailResult(ErrorCode.CONFIGURE_NOT_INITIALIZED);
         }
         if (mgrConfig.isOnlyUseLocalProxyConfig()) {
-            return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+            return getLocalProxyListFromFile(this.localProxyConfigStoreFile, 
procResult);
         } else {
             boolean readFromRmt = false;
-            Tuple2<ProxyConfigEntry, String> result;
-            result = tryToReadCacheProxyEntry();
-            if (result.getF0() == null) {
+            if (!tryToReadCacheProxyEntry(procResult)) {
                 int retryCount = 0;
-                do {
-                    result = requestProxyEntryQuietly();
-                    if (result.getF0() != null || !needRetry || 
shutDown.get()) {
-                        if (result.getF0() != null) {
-                            readFromRmt = true;
-                        }
+                while (!shutDown.get()) {
+                    if (requestProxyEntryQuietly(procResult)) {
+                        readFromRmt = true;
+                        break;
+                    }
+                    if (!needRetry
+                            || ++retryCount >= 
mgrConfig.getMetaQueryMaxRetryIfFail()
+                            || shutDown.get()) {
                         break;
                     }
                     // sleep then retry
                     
ProxyUtils.sleepSomeTime(mgrConfig.getMetaQueryWaitMsIfFail());
-                } while (++retryCount < 
mgrConfig.getMetaQueryMaxRetryIfFail());
+                }
             }
             if (shutDown.get()) {
-                return new Tuple2<>(null, "SDK has shutdown!");
+                return procResult.setFailResult(ErrorCode.SDK_CLOSED);
             }
-            if (result.getF0() == null) {
-                return new Tuple2<>(null, "Visit manager error:" + 
result.getF1());
-            } else if (readFromRmt) {
-                tryToWriteCacheProxyEntry(result.getF0());
+            if (readFromRmt && procResult.isSuccess()) {
+                tryToWriteCacheProxyEntry((ProxyConfigEntry) 
procResult.getRetData());
             }
-            return result;
+            return procResult.isSuccess();
         }
     }
 
     /**
-     * get encrypt config
+     * query encrypt configure from remote manager
      *
      * @return proxyConfigEntry
-     * @throws Exception ex
      */
-    public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean 
needRetry) throws Exception {
-        if (!mgrConfig.isEnableReportEncrypt()) {
-            return new Tuple2<>(null, "Not need data encrypt!");
-        }
+    public boolean getEncryptConfigure(boolean needRetry, ProcessResult 
procResult) {
         if (shutDown.get()) {
-            return new Tuple2<>(null, "SDK has shutdown!");
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
         if (mgrConfig == null) {
-            return new Tuple2<>(null, "Configure not initialized!");
-        }
-        EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
-        if (encryptEntry != null) {
-            return new Tuple2<>(encryptEntry, "Ok");
+            return 
procResult.setFailResult(ErrorCode.CONFIGURE_NOT_INITIALIZED);
         }
         boolean readFromRmt = false;
-        Tuple2<EncryptConfigEntry, String> result = readCachedPubKeyEntry();
-        if (result.getF0() == null) {
+        if (!readCachedPubKeyEntry(procResult)) {
             int retryCount = 0;
-            do {
-                result = requestPubKeyFromManager();
-                if (result.getF0() != null || !needRetry || shutDown.get()) {
-                    if (result.getF0() != null) {
-                        readFromRmt = true;
-                    }
+            while (!shutDown.get()) {
+                if (requestPubKeyFromManager(procResult)) {
+                    readFromRmt = true;
+                    break;
+                }
+                if (!needRetry
+                        || ++retryCount >= 
mgrConfig.getMetaQueryMaxRetryIfFail()
+                        || shutDown.get()) {
                     break;
                 }
                 // sleep then retry
                 ProxyUtils.sleepSomeTime(mgrConfig.getMetaQueryWaitMsIfFail());
-            } while (++retryCount < mgrConfig.getMetaQueryMaxRetryIfFail());
+            }
         }
         if (shutDown.get()) {
-            return new Tuple2<>(null, "SDK has shutdown!");
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        if (result.getF0() == null) {
-            return new Tuple2<>(null, "Visit manager error:" + result.getF1());
-        } else if (readFromRmt) {
-            updateEncryptConfigEntry(result.getF0());
-            writeCachePubKeyEntryFile(result.getF0());
+        if (readFromRmt && procResult.isSuccess()) {
+            writeCachePubKeyEntryFile((EncryptConfigEntry) 
procResult.getRetData());
         }
-        return result;
+        return procResult.isSuccess();
     }
 
     @Override
     public void run() {
         logger.info("ConfigManager({}) thread start, groupId={}",
                 this.callerId, mgrConfig.getInlongGroupId());
+        long curTime;
+        ProcessResult procResult = new ProcessResult();
         while (!shutDown.get()) {
             // update proxy nodes meta configures
-            try {
-                doProxyEntryQueryWork();
-            } catch (Throwable ex) {
-                if (exptCounter.shouldPrint()) {
-                    logger.warn("ConfigManager({}) refresh proxy configure 
exception, groupId={}",
-                            this.callerId, mgrConfig.getInlongGroupId(), ex);
-                }
-            }
-            // update encrypt configure
-            if (mgrConfig.isEnableReportEncrypt()) {
-                try {
-                    doEncryptConfigEntryQueryWork();
-                } catch (Throwable ex) {
-                    if (exptCounter.shouldPrint()) {
-                        logger.warn("ConfigManager({}) refresh encrypt info 
exception, groupId={}",
-                                this.callerId, mgrConfig.getInlongGroupId(), 
ex);
-                    }
-                }
-            }
+            curTime = System.currentTimeMillis();
+            updateMetaInfoFromRemote(procResult);
             if (shutDown.get()) {
                 break;
             }
@@ -294,141 +266,179 @@ public class ProxyConfigManager extends Thread {
                 this.callerId, this.mgrConfig.getInlongGroupId());
     }
 
+    private boolean updateMetaInfoFromRemote(ProcessResult procResult) {
+        // update proxy nodes meta configures
+        if (!doProxyEntryQueryWork(procResult)) {
+            return procResult.isSuccess();
+        }
+        if (mgrConfig.isEnableReportEncrypt()) {
+            if (!doEncryptConfigEntryQueryWork(procResult)) {
+                return procResult.isSuccess();
+            }
+        }
+        return procResult.setSuccess();
+    }
+
+    public ProxyConfigEntry getProxyConfigEntry() {
+        return this.proxyConfigEntry;
+    }
+
+    public EncryptConfigEntry getUserEncryptConfigEntry() {
+        return userEncryptConfigEntry;
+    }
     /**
      * request proxyHost list from manager, update ClientMgr.proxyHostList and 
channels
      *
      * @throws Exception
      */
-    public void doProxyEntryQueryWork() throws Exception {
-        if (shutDown.get() || this.clientManager == null) {
-            return;
-        }
+    public boolean doProxyEntryQueryWork(ProcessResult procResult) {
         /* Request the configuration from manager. */
         if (localMd5 == null) {
             localMd5 = calcHostInfoMd5(proxyInfoList);
         }
-        Tuple2<ProxyConfigEntry, String> result;
+        ProxyConfigEntry rmtProxyConfigEntry = null;
         if (mgrConfig.isOnlyUseLocalProxyConfig()) {
-            result = getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+            if (!getLocalProxyListFromFile(this.localProxyConfigStoreFile, 
procResult)) {
+                return false;
+            }
+            rmtProxyConfigEntry = (ProxyConfigEntry) procResult.getRetData();
         } else {
             int retryCnt = 0;
-            do {
-                result = requestProxyEntryQuietly();
-                if (result.getF0() != null || shutDown.get()) {
+            while (!shutDown.get()) {
+                if (requestProxyEntryQuietly(procResult)) {
+                    break;
+                }
+                if (++retryCnt >= this.mgrConfig.getMetaSyncMaxRetryIfFail() 
|| shutDown.get()) {
                     break;
                 }
                 // sleep then retry.
                 ProxyUtils.sleepSomeTime(mgrConfig.getMetaSyncWaitMsIfFail());
-            } while (++retryCnt < this.mgrConfig.getMetaSyncMaxRetryIfFail() 
&& !shutDown.get());
+            }
             if (shutDown.get()) {
-                return;
+                return procResult.setFailResult(ErrorCode.SDK_CLOSED);
             }
-            if (result.getF0() != null) {
-                tryToWriteCacheProxyEntry(result.getF0());
+            if (procResult.isSuccess()) {
+                rmtProxyConfigEntry = (ProxyConfigEntry) 
procResult.getRetData();
+                tryToWriteCacheProxyEntry(rmtProxyConfigEntry);
             }
             /* We should exit if no local IP list and can't request it from 
TDManager. */
-            if (localMd5 == null && result.getF0() == null) {
+            if (localMd5 == null && rmtProxyConfigEntry == null) {
                 if (exptCounter.shouldPrint()) {
                     logger.warn("ConfigManager({}) connect manager({}) 
failure, get cached configure, groupId={}",
                             this.callerId, this.proxyConfigVisitUrl, 
this.mgrConfig.getInlongGroupId());
                 }
-                result = tryToReadCacheProxyEntry();
+                if (tryToReadCacheProxyEntry(procResult)) {
+                    rmtProxyConfigEntry = (ProxyConfigEntry) 
procResult.getRetData();
+                }
             }
-            if (localMd5 != null && result.getF0() == null && proxyInfoList != 
null) {
+            if (localMd5 != null && rmtProxyConfigEntry == null && 
proxyInfoList != null) {
                 if (exptCounter.shouldPrint()) {
                     logger.warn("ConfigManager({}) connect manager({}) 
failure, using the last configure, groupId={}",
                             this.callerId, this.proxyConfigVisitUrl, 
this.mgrConfig.getInlongGroupId());
                 }
             }
         }
-        if (localMd5 == null && result.getF0() == null && proxyInfoList == 
null) {
-            if (mgrConfig.isOnlyUseLocalProxyConfig()) {
-                throw new Exception("Read local proxy configure failure, 
please check first!");
-            } else {
-                throw new Exception("Connect Manager failure, please check 
first!");
+        if (localMd5 == null && rmtProxyConfigEntry == null && proxyInfoList 
== null) {
+            if (exptCounter.shouldPrint()) {
+                if (mgrConfig.isOnlyUseLocalProxyConfig()) {
+                    logger.warn("ConfigManager({}) continue fetch proxy meta 
failure, localFile={}, groupId={}",
+                            this.callerId, this.localProxyConfigStoreFile, 
this.mgrConfig.getInlongGroupId());
+                } else {
+                    logger.warn("ConfigManager({}) continue fetch proxy meta 
failure, manager={}, groupId={}",
+                            this.callerId, this.proxyConfigVisitUrl, 
this.mgrConfig.getInlongGroupId());
+                }
             }
+            return procResult.isSuccess();
         }
-        compareAndUpdateProxyList(result.getF0());
+        compareAndUpdateProxyList(rmtProxyConfigEntry);
+        return procResult.setSuccess();
     }
 
-    private void doEncryptConfigEntryQueryWork() throws Exception {
-        if (shutDown.get() || this.clientManager == null) {
-            return;
-        }
+    public boolean doEncryptConfigEntryQueryWork(ProcessResult procResult) {
         int retryCount = 0;
-        Tuple2<EncryptConfigEntry, String> result;
-        do {
-            result = requestPubKeyFromManager();
-            if (result.getF0() != null || shutDown.get()) {
+        while (!shutDown.get()) {
+            if (requestPubKeyFromManager(procResult)) {
+                break;
+            }
+            if (++retryCount >= this.mgrConfig.getMetaSyncMaxRetryIfFail() || 
shutDown.get()) {
                 break;
             }
             // sleep then retry
             ProxyUtils.sleepSomeTime(mgrConfig.getMetaSyncWaitMsIfFail());
-        } while (++retryCount < mgrConfig.getMetaSyncMaxRetryIfFail());
+        }
         if (shutDown.get()) {
-            return;
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        if (result.getF0() == null) {
-            if (this.userEncryptConfigEntry != null) {
-                logger.warn("ConfigManager({}) connect manager({}) failure, 
using the last pubKey, userName={}",
-                        this.callerId, this.encryptConfigVisitUrl, 
this.mgrConfig.getRptUserName());
-                return;
+        if (!procResult.isSuccess()) {
+            if (exptCounter.shouldPrint()) {
+                if (this.userEncryptConfigEntry == null) {
+                    logger.warn("ConfigManager({}) continue fetch encrypt meta 
failure, manager={}, username={}",
+                            this.callerId, this.encryptConfigVisitUrl, 
mgrConfig.getRptUserName());
+                } else {
+                    logger.warn("ConfigManager({}) fetch encrypt failure, 
manager={}, use the last pubKey, username={}",
+                            this.callerId, this.encryptConfigVisitUrl, 
mgrConfig.getRptUserName());
+                }
             }
-            throw new Exception("Visit manager error:" + result.getF1());
+            return procResult.isSuccess();
         }
-        updateEncryptConfigEntry(result.getF0());
-        writeCachePubKeyEntryFile(result.getF0());
+        EncryptConfigEntry rmtEncryptEntry =
+                (EncryptConfigEntry) procResult.getRetData();
+        updateEncryptConfigEntry(rmtEncryptEntry);
+        writeCachePubKeyEntryFile(rmtEncryptEntry);
+        return procResult.setSuccess();
     }
 
-    public Tuple2<ProxyConfigEntry, String> getLocalProxyListFromFile(String 
filePath) {
+    public boolean getLocalProxyListFromFile(String filePath, ProcessResult 
procResult) {
         String strRet;
         try {
             byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
             strRet = new String(fileBytes);
         } catch (Throwable ex) {
-            return new Tuple2<>(null, "Read local configure failure from "
-                    + filePath + ", reason is " + ex.getMessage());
+            return procResult.setFailResult(ErrorCode.READ_LOCAL_FILE_FAILURE,
+                    "Read local configure failure from "
+                            + filePath + ", reason is " + ex.getMessage());
         }
         if (StringUtils.isBlank(strRet)) {
-            return new Tuple2<>(null, "Blank configure local file from " + 
filePath);
+            return procResult.setFailResult(ErrorCode.BLANK_FILE_CONTENT,
+                    "Blank configure local file from " + filePath);
         }
-        return getProxyConfigEntry(false, strRet);
+        return getProxyConfigEntry(false, strRet, procResult);
     }
 
-    private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
+    private boolean requestProxyEntryQuietly(ProcessResult procResult) {
         // check cache failure
         String qryResult = getManagerQryResultInFailStatus(true);
         if (qryResult != null) {
-            return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, 
please retry later!");
+            return 
procResult.setFailResult(ErrorCode.FREQUENT_RMT_FAILURE_VISIT,
+                    "Query fail(" + qryResult + ") just now, retry later!");
         }
         // request meta info from manager
         List<BasicNameValuePair> params = buildProxyNodeQueryParams();
         logger.debug("ConfigManager({}) request configure to manager({}), 
param={}",
                 this.callerId, this.proxyConfigVisitUrl, params);
-        Tuple2<Boolean, String> queryResult =
-                requestConfiguration(true, this.proxyConfigVisitUrl, params);
-        if (!queryResult.getF0()) {
-            return new Tuple2<>(null, queryResult.getF1());
+
+        if (!requestConfiguration(true, this.proxyConfigVisitUrl, params, 
procResult)) {
+            return false;
         }
+        String content = (String) procResult.getRetData();
         // parse result
         logger.debug("ConfigManager({}) received configure, from manager({}), 
groupId={}, result={}",
-                callerId, proxyConfigVisitUrl, mgrConfig.getInlongGroupId(), 
queryResult.getF1());
+                callerId, proxyConfigVisitUrl, mgrConfig.getInlongGroupId(), 
content);
         try {
-            Tuple2<ProxyConfigEntry, String> parseResult =
-                    getProxyConfigEntry(true, queryResult.getF1());
-            if (parseResult.getF0() == null) {
-                bookManagerQryFailStatus(true, parseResult.getF1());
-            } else {
+            if (getProxyConfigEntry(true, content, procResult)) {
                 rmvManagerQryFailStatus(true);
+            } else {
+                bookManagerQryFailStatus(true, procResult.getErrMsg());
             }
-            return parseResult;
+            return procResult.isSuccess();
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
-                logger.warn("ConfigManager({}) parse failure, from 
manager({}), groupId={}, result={}",
-                        callerId, proxyConfigVisitUrl, 
mgrConfig.getInlongGroupId(), queryResult.getF1(), ex);
+                logger.warn("ConfigManager({}) parse exception, from 
manager({}), groupId={}, result={}",
+                        callerId, proxyConfigVisitUrl, 
mgrConfig.getInlongGroupId(), content, ex);
             }
             bookManagerQryFailStatus(true, ex.getMessage());
-            return new Tuple2<>(null, ex.getMessage());
+            return procResult.setFailResult(
+                    ErrorCode.PARSE_PROXY_META_EXCEPTION, ex.getMessage());
         }
     }
 
@@ -468,6 +478,7 @@ public class ProxyConfigManager extends Thread {
             newProxyNodeList = new ArrayList<>(proxyInfoList.size());
             newProxyNodeList.addAll(proxyInfoList);
         } else {
+            this.proxyConfigEntry = proxyEntry;
             newSwitchStat = proxyEntry.getSwitchStat();
             newProxyNodeList = new ArrayList<>(proxyEntry.getSize());
             for (Map.Entry<String, HostInfo> entry : 
proxyEntry.getHostMap().entrySet()) {
@@ -516,7 +527,7 @@ public class ProxyConfigManager extends Thread {
      *
      * @return read result
      */
-    private Tuple2<ProxyConfigEntry, String> tryToReadCacheProxyEntry() {
+    private boolean tryToReadCacheProxyEntry(ProcessResult procResult) {
         fileRw.readLock().lock();
         try {
             File file = new File(this.proxyConfigCacheFile);
@@ -526,63 +537,65 @@ public class ProxyConfigManager extends Thread {
                         && diffTime < mgrConfig.getMetaCacheExpiredMs()) {
                     JsonReader reader = new JsonReader(new 
FileReader(this.proxyConfigCacheFile));
                     ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, 
ProxyConfigEntry.class);
-                    return new Tuple2<>(proxyConfigEntry, "Ok");
+                    return procResult.setSuccess(proxyConfigEntry);
                 }
-                return new Tuple2<>(null, "cache configure expired!");
+                return procResult.setFailResult(ErrorCode.LOCAL_FILE_EXPIRED);
             } else {
-                return new Tuple2<>(null, "no cache configure!");
+                return 
procResult.setFailResult(ErrorCode.LOCAL_FILE_NOT_EXIST);
             }
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
                 logger.warn("ConfigManager({}) read cache file({}) exception, 
groupId={}",
                         this.callerId, this.proxyConfigCacheFile, 
this.mgrConfig.getInlongGroupId(), ex);
             }
-            return new Tuple2<>(null, "read cache configure failure:" + 
ex.getMessage());
+            return procResult.setFailResult(
+                    ErrorCode.READ_LOCAL_FILE_FAILURE, "read cache configure 
failure:" + ex.getMessage());
         } finally {
             fileRw.readLock().unlock();
         }
     }
 
-    private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
+    private boolean requestPubKeyFromManager(ProcessResult procResult) {
         // check cache failure
         String qryResult = getManagerQryResultInFailStatus(false);
         if (qryResult != null) {
-            return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, 
please retry later!");
+            procResult.setFailResult(ErrorCode.FREQUENT_RMT_FAILURE_VISIT,
+                    "Query fail(" + qryResult + ") just now, retry later!");
         }
         // request meta info from manager
         List<BasicNameValuePair> params = buildPubKeyQueryParams();
         logger.debug("ConfigManager({}) request pubkey to manager({}), 
param={}",
                 this.callerId, this.encryptConfigVisitUrl, params);
-        Tuple2<Boolean, String> queryResult =
-                requestConfiguration(false, this.encryptConfigVisitUrl, 
params);
-        if (!queryResult.getF0()) {
-            return new Tuple2<>(null, queryResult.getF1());
+        if (!requestConfiguration(false, this.encryptConfigVisitUrl, params, 
procResult)) {
+            return false;
         }
+        String content = (String) procResult.getRetData();
         logger.debug("ConfigManager({}) received pubkey from manager({}), 
result={}",
-                this.callerId, this.encryptConfigVisitUrl, 
queryResult.getF1());
+                this.callerId, this.encryptConfigVisitUrl, content);
         String errorMsg;
         JsonObject pubKeyConf;
         try {
-            pubKeyConf = 
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
+            pubKeyConf = JsonParser.parseString(content).getAsJsonObject();
         } catch (Throwable ex) {
             if (parseCounter.shouldPrint()) {
                 logger.warn("ConfigManager({}) parse failure, userName={}, 
config={}!",
-                        this.callerId, this.mgrConfig.getRptUserName(), 
queryResult.getF1());
+                        this.callerId, this.mgrConfig.getRptUserName(), 
content);
             }
             errorMsg = "parse pubkey failure:" + ex.getMessage();
             bookManagerQryFailStatus(false, errorMsg);
-            return new Tuple2<>(null, errorMsg);
+            return procResult.setFailResult(
+                    ErrorCode.PARSE_RMT_CONTENT_FAILURE, errorMsg);
         }
         if (pubKeyConf == null) {
             errorMsg = "No public key information";
             bookManagerQryFailStatus(false, errorMsg);
-            return new Tuple2<>(null, errorMsg);
+            return 
procResult.setFailResult(ErrorCode.PARSE_RMT_CONTENT_IS_NULL);
         }
         try {
             if (!pubKeyConf.has("resultCode")) {
                 if (parseCounter.shouldPrint()) {
                     logger.warn("ConfigManager({}) config failure: resultCode 
field not exist, userName={}, config={}!",
-                            this.callerId, this.mgrConfig.getRptUserName(), 
queryResult.getF1());
+                            this.callerId, this.mgrConfig.getRptUserName(), 
content);
                 }
                 throw new Exception("resultCode field not exist");
             }
@@ -590,14 +603,14 @@ public class ProxyConfigManager extends Thread {
             if (resultCode != 0) {
                 if (parseCounter.shouldPrint()) {
                     logger.warn("ConfigManager({}) config failure: resultCode 
!= 0, userName={}, config={}!",
-                            this.callerId, this.mgrConfig.getRptUserName(), 
queryResult.getF1());
+                            this.callerId, this.mgrConfig.getRptUserName(), 
content);
                 }
                 throw new Exception("resultCode != 0!");
             }
             if (!pubKeyConf.has("resultData")) {
                 if (parseCounter.shouldPrint()) {
                     logger.warn("ConfigManager({}) config failure: resultData 
field not exist, userName={}, config={}!",
-                            this.callerId, this.mgrConfig.getRptUserName(), 
queryResult.getF1());
+                            this.callerId, this.mgrConfig.getRptUserName(), 
content);
                 }
                 throw new Exception("resultData field not exist");
             }
@@ -607,7 +620,7 @@ public class ProxyConfigManager extends Thread {
                 if (StringUtils.isBlank(publicKey)) {
                     if (parseCounter.shouldPrint()) {
                         logger.warn("ConfigManager({}) config failure: 
publicKey is blank, userName={}, config={}!",
-                                this.callerId, 
this.mgrConfig.getRptUserName(), queryResult.getF1());
+                                this.callerId, 
this.mgrConfig.getRptUserName(), content);
                     }
                     throw new Exception("publicKey is blank!");
                 }
@@ -615,7 +628,7 @@ public class ProxyConfigManager extends Thread {
                 if (StringUtils.isBlank(username)) {
                     if (parseCounter.shouldPrint()) {
                         logger.warn("ConfigManager({}) config failure: 
username is blank, userName={}, config={}!",
-                                this.callerId, 
this.mgrConfig.getRptUserName(), queryResult.getF1());
+                                this.callerId, 
this.mgrConfig.getRptUserName(), content);
                     }
                     throw new Exception("username is blank!");
                 }
@@ -623,17 +636,17 @@ public class ProxyConfigManager extends Thread {
                 if (StringUtils.isBlank(versionStr)) {
                     if (parseCounter.shouldPrint()) {
                         logger.warn("ConfigManager({}) config failure: version 
is blank, userName={}, config={}!",
-                                this.callerId, 
this.mgrConfig.getRptUserName(), queryResult.getF1());
+                                this.callerId, 
this.mgrConfig.getRptUserName(), content);
                     }
                     throw new Exception("version is blank!");
                 }
                 rmvManagerQryFailStatus(false);
-                return new Tuple2<>(new EncryptConfigEntry(username, 
versionStr, publicKey), "Ok");
+                return procResult.setSuccess(new EncryptConfigEntry(username, 
versionStr, publicKey));
             }
             throw new Exception("resultData value is null!");
         } catch (Throwable ex) {
             bookManagerQryFailStatus(false, ex.getMessage());
-            return new Tuple2<>(null, ex.getMessage());
+            return 
procResult.setFailResult(ErrorCode.PARSE_ENCRYPT_META_EXCEPTION, 
ex.getMessage());
         }
     }
 
@@ -642,7 +655,7 @@ public class ProxyConfigManager extends Thread {
         this.userEncryptConfigEntry = newEncryptEntry;
     }
 
-    private Tuple2<EncryptConfigEntry, String> readCachedPubKeyEntry() {
+    private boolean readCachedPubKeyEntry(ProcessResult procResult) {
         ObjectInputStream is;
         FileInputStream fis = null;
         EncryptConfigEntry entry;
@@ -658,18 +671,19 @@ public class ProxyConfigManager extends Thread {
                     entry = (EncryptConfigEntry) is.readObject();
                     // is.close();
                     fis.close();
-                    return new Tuple2<>(entry, "Ok");
+                    return procResult.setSuccess(entry);
                 }
-                return new Tuple2<>(null, "cache PubKeyEntry expired!");
+                return procResult.setFailResult(ErrorCode.LOCAL_FILE_EXPIRED);
             } else {
-                return new Tuple2<>(null, "no PubKeyEntry file!");
+                return 
procResult.setFailResult(ErrorCode.LOCAL_FILE_NOT_EXIST);
             }
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
                 logger.warn("ConfigManager({}) read({}) file exception, 
userName={}",
                         callerId, encryptConfigCacheFile, 
mgrConfig.getRptUserName(), ex);
             }
-            return new Tuple2<>(null, "read PubKeyEntry file failure:" + 
ex.getMessage());
+            return procResult.setFailResult(
+                    ErrorCode.READ_LOCAL_FILE_FAILURE, "read PubKeyEntry file 
failure:" + ex.getMessage());
         } finally {
             if (fis != null) {
                 try {
@@ -717,8 +731,8 @@ public class ProxyConfigManager extends Thread {
     }
 
     /* Request new configurations from Manager. */
-    private Tuple2<Boolean, String> requestConfiguration(
-            boolean queryProxyInfo, String url, List<BasicNameValuePair> 
params) {
+    private boolean requestConfiguration(
+            boolean queryProxyInfo, String url, List<BasicNameValuePair> 
params, ProcessResult procResult) {
         HttpParams myParams = new BasicHttpParams();
         HttpConnectionParams.setConnectionTimeout(myParams, 
mgrConfig.getMgrConnTimeoutMs());
         HttpConnectionParams.setSoTimeout(myParams, 
mgrConfig.getMgrSocketTimeoutMs());
@@ -735,39 +749,39 @@ public class ProxyConfigManager extends Thread {
                 logger.warn("ConfigManager({}) create Http(s) client failure, 
url={}, params={}",
                         this.callerId, url, params, eHttp);
             }
-            return new Tuple2<>(false, eHttp.getMessage());
+            return procResult.setFailResult(
+                    ErrorCode.BUILD_HTTP_CLIENT_EXCEPTION, eHttp.getMessage());
         }
         // post request and get response
         HttpPost httpPost = null;
         try {
+            String errMsg;
             httpPost = new HttpPost(url);
             this.addAuthorizationInfo(httpPost);
             UrlEncodedFormEntity urlEncodedFormEntity =
                     new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
             httpPost.setEntity(urlEncodedFormEntity);
             HttpResponse response = httpClient.execute(httpPost);
-            String errMsg;
+            String returnStr = EntityUtils.toString(response.getEntity());
             if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-                errMsg = response.getStatusLine().getStatusCode()
-                        + ":" + response.getStatusLine().getReasonPhrase();
+                errMsg = response.getStatusLine().getStatusCode() + ":" + 
returnStr;
                 if (response.getStatusLine().getStatusCode() >= 500) {
                     bookManagerQryFailStatus(queryProxyInfo, errMsg);
                 }
-                return new Tuple2<>(false, errMsg);
+                return procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE, 
errMsg);
             }
-            String returnStr = EntityUtils.toString(response.getEntity());
             if (StringUtils.isBlank(returnStr)) {
                 errMsg = "server return blank entity!";
                 bookManagerQryFailStatus(queryProxyInfo, errMsg);
-                return new Tuple2<>(false, errMsg);
+                return 
procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT, errMsg);
             }
-            return new Tuple2<>(true, returnStr);
+            return procResult.setSuccess(returnStr);
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
                 logger.warn("ConfigManager({}) connect manager({}) exception, 
params={}",
                         this.callerId, url, params, ex);
             }
-            return new Tuple2<>(false, ex.getMessage());
+            return procResult.setFailResult(ErrorCode.HTTP_VISIT_EXCEPTION, 
ex.getMessage());
         } finally {
             if (httpPost != null) {
                 httpPost.releaseConnection();
@@ -786,8 +800,8 @@ public class ProxyConfigManager extends Thread {
             headers.add(new BasicHeader(paramItem.getName(), 
paramItem.getValue()));
         }
         RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(mgrConfig.getMgrConnTimeoutMs())
-                .setSocketTimeout(mgrConfig.getMgrSocketTimeoutMs()).build();
+                .setSocketTimeout(mgrConfig.getMgrSocketTimeoutMs())
+                .setConnectTimeout(mgrConfig.getMgrConnTimeoutMs()).build();
         SSLContext sslContext = SSLContexts.custom().build();
         SSLConnectionSocketFactory sslSf = new 
SSLConnectionSocketFactory(sslContext,
                 new String[]{mgrConfig.getTlsVersion()}, null,
@@ -894,7 +908,7 @@ public class ProxyConfigManager extends Thread {
         return null;
     }
 
-    private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(boolean 
fromManager, String strRet) {
+    protected boolean getProxyConfigEntry(boolean fromManager, String strRet, 
ProcessResult procResult) {
         DataProxyNodeResponse proxyNodeConfig;
         if (fromManager) {
             ProxyClusterConfig clusterConfig;
@@ -905,16 +919,18 @@ public class ProxyConfigManager extends Thread {
                     logger.warn("ConfigManager({}) parse exception, 
groupId={}, config={}",
                             this.callerId, mgrConfig.getInlongGroupId(), 
strRet, ex);
                 }
-                return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+                return procResult.setFailResult(
+                        ErrorCode.PARSE_RMT_CONTENT_FAILURE, "parse failure:" 
+ ex.getMessage());
             }
             if (clusterConfig == null) {
-                return new Tuple2<>(null, "content parse result is null!");
+                return 
procResult.setFailResult(ErrorCode.PARSE_RMT_CONTENT_IS_NULL);
             }
             if (!clusterConfig.isSuccess()) {
-                return new Tuple2<>(null, clusterConfig.getErrMsg());
+                return procResult.setFailResult(
+                        ErrorCode.RMT_RETURN_ERROR, clusterConfig.getErrMsg());
             }
             if (clusterConfig.getData() == null) {
-                return new Tuple2<>(null, "return data content is null!");
+                return 
procResult.setFailResult(ErrorCode.META_FIELD_DATA_IS_NULL);
             }
             proxyNodeConfig = clusterConfig.getData();
         } else {
@@ -925,16 +941,17 @@ public class ProxyConfigManager extends Thread {
                     logger.warn("ConfigManager({}) parse local file exception, 
groupId={}, config={}",
                             this.callerId, mgrConfig.getInlongGroupId(), 
strRet, ex);
                 }
-                return new Tuple2<>(null, "parse file failure:" + 
ex.getMessage());
+                return procResult.setFailResult(
+                        ErrorCode.PARSE_FILE_CONTENT_FAILURE, "parse failure:" 
+ ex.getMessage());
             }
             if (proxyNodeConfig == null) {
-                return new Tuple2<>(null, "file content parse result is 
null!");
+                return 
procResult.setFailResult(ErrorCode.PARSE_FILE_CONTENT_IS_NULL);
             }
         }
         // parse nodeList
         List<DataProxyNodeInfo> nodeList = proxyNodeConfig.getNodeList();
         if (CollectionUtils.isEmpty(nodeList)) {
-            return new Tuple2<>(null, "nodeList is empty!");
+            return procResult.setFailResult(ErrorCode.META_NODE_LIST_IS_EMPTY);
         }
         HostInfo tmpHostInfo;
         Map<String, HostInfo> hostMap = new HashMap<>();
@@ -953,7 +970,7 @@ public class ProxyConfigManager extends Thread {
             hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
         }
         if (hostMap.isEmpty()) {
-            return new Tuple2<>(null, "no valid nodeList records!");
+            return 
procResult.setFailResult(ErrorCode.NODE_LIST_RECORD_INVALID);
         }
         // parse clusterId
         int clusterId = -1;
@@ -985,6 +1002,6 @@ public class ProxyConfigManager extends Thread {
         proxyEntry.setLoad(load);
         proxyEntry.setMaxPacketLength(
                 proxyNodeConfig.getMaxPacketLength() != null ? 
proxyNodeConfig.getMaxPacketLength() : -1);
-        return new Tuple2<>(proxyEntry, "ok");
+        return procResult.setSuccess(proxyEntry);
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index b9eec02799..6fb1caf59f 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.network;
 
 import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
@@ -104,8 +105,9 @@ public class ClientMgr {
         if (!started.compareAndSet(false, true)) {
             return;
         }
+        ProcessResult procResult = new ProcessResult();
         try {
-            this.configManager.doProxyEntryQueryWork();
+            this.configManager.doProxyEntryQueryWork(procResult);
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
                 logger.error("ClientMgr({}) query {} exception",
@@ -159,22 +161,23 @@ public class ClientMgr {
         if (!this.started.get()) {
             throw new Exception("SDK not start or has shutdown!");
         }
-        Tuple2<ProxyConfigEntry, String> result =
-                configManager.getGroupIdConfigure(true);
-        if (result.getF0() == null) {
-            throw new Exception(result.getF1());
+        ProcessResult procResult = new ProcessResult();
+        if (!configManager.getGroupIdConfigure(true, procResult)) {
+            throw new Exception(procResult.toString());
         }
-        return result.getF0();
+        return (ProxyConfigEntry) procResult.getRetData();
     }
 
     public EncryptConfigEntry getEncryptConfigureInfo() {
         if (!this.started.get()) {
             return null;
         }
-        Tuple2<EncryptConfigEntry, String> result;
+        ProcessResult procResult = new ProcessResult();
         try {
-            result = configManager.getEncryptConfigure(false);
-            return result.getF0();
+            if (configManager.getEncryptConfigure(false, procResult)) {
+                return (EncryptConfigEntry) procResult.getRetData();
+            }
+            return null;
         } catch (Throwable ex) {
             return null;
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 9da6305b49..eec1326747 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
@@ -25,7 +26,6 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
 import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,10 +92,12 @@ public class HttpProxySender extends Thread {
      *
      * @return proxy config entry.
      */
-    private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
-        Tuple2<ProxyConfigEntry, String> result =
-                proxyConfigManager.getGroupIdConfigure(true);
-        return result.getF0();
+    private ProxyConfigEntry retryGettingProxyConfig() {
+        ProcessResult procResult = new ProcessResult();
+        if (proxyConfigManager.getGroupIdConfigure(true, procResult)) {
+            return (ProxyConfigEntry) procResult.getRetData();
+        }
+        return null;
     }
 
     /**
@@ -103,19 +105,19 @@ public class HttpProxySender extends Thread {
      */
     @Override
     public void run() {
+        ProcessResult procResult = new ProcessResult();
         while (!bShutDown) {
             try {
                 int rand = ThreadLocalRandom.current().nextInt(0, 600);
                 long randSleepTime = proxyClientConfig.getMgrMetaSyncInrMs() + 
rand;
                 TimeUnit.MILLISECONDS.sleep(randSleepTime);
                 if (proxyConfigManager != null) {
-                    Tuple2<ProxyConfigEntry, String> result =
-                            proxyConfigManager.getGroupIdConfigure(false);
-                    if (result.getF0() == null) {
-                        throw new Exception(result.getF1());
+                    if (!proxyConfigManager.getGroupIdConfigure(false, 
procResult)) {
+                        throw new Exception(procResult.toString());
                     }
-                    hostList.addAll(result.getF0().getHostMap().values());
-                    hostList.retainAll(result.getF0().getHostMap().values());
+                    ProxyConfigEntry configEntry = (ProxyConfigEntry) 
procResult.getRetData();
+                    hostList.addAll(configEntry.getHostMap().values());
+                    hostList.retainAll(configEntry.getHostMap().values());
                 } else {
                     logger.error("manager is null, please check it!");
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index c74aea445e..0367e6e06a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -26,12 +26,14 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.InputStream;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 public class ProxyUtils {
@@ -43,9 +45,11 @@ public class ProxyUtils {
     private static final Set<String> invalidAttr = new HashSet<>();
     public static final Set<MsgType> SdkAllowedMsgType = new HashSet<>();
     private static String localHost;
+    private static String sdkVersion;
 
     static {
         localHost = getLocalIp();
+        getJarVersion();
         Collections.addAll(invalidAttr, "groupId", "streamId", "dt", 
"msgUUID", "cp",
                 "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", 
"_file_status_check", "_secretId",
                 "_signature", "_timeStamp", "_nonce", "_userName", 
"_clientIP", "_encyVersion", "_encyAesKey",
@@ -72,6 +76,22 @@ public class ProxyUtils {
         return ip;
     }
 
+    public static String getJarVersion() {
+        if (sdkVersion != null) {
+            return sdkVersion;
+        }
+        Properties properties = new Properties();
+        try (InputStream is = 
ProxyUtils.class.getResourceAsStream("/git.properties")) {
+            properties.load(is);
+            sdkVersion = properties.getProperty("git.build.version");
+        } catch (Throwable ex) {
+            if (exceptCounter.shouldPrint()) {
+                logger.error("DataProxy-SDK get version failure", ex);
+            }
+        }
+        return sdkVersion;
+    }
+
     public static boolean sleepSomeTime(long sleepTimeMs) {
         try {
             Thread.sleep(sleepTimeMs);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index c7e3f773e2..da34136449 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
-import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -47,8 +47,9 @@ public class ProxyConfigManagerTest {
 
     @Test
     public void testProxyConfigParse() throws Exception {
-        Tuple2<ProxyConfigEntry, String> result = 
proxyConfigManager.getLocalProxyListFromFile(localFile);
-        ProxyConfigEntry proxyEntry = result.getF0();
+        ProcessResult procResult = new ProcessResult();
+        proxyConfigManager.getLocalProxyListFromFile(localFile, procResult);
+        ProxyConfigEntry proxyEntry = (ProxyConfigEntry) 
procResult.getRetData();
         Assert.assertEquals(proxyEntry.isInterVisit(), false);
         Assert.assertEquals(proxyEntry.getLoad(), 12);
         Assert.assertEquals(proxyEntry.getClusterId(), 1);
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 1ce9d45d3e..d646c2d1b4 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -31,8 +31,8 @@
 
     <modules>
         <module>sdk-common</module>
-        <module>sort-sdk</module>
         <module>dataproxy-sdk</module>
+        <module>sort-sdk</module>
         <module>transform-sdk</module>
         <module>dirty-data-sdk</module>
     </modules>

Reply via email to