This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ac370c4a [INLONG-6656][SDK] Support QueryConsumerConfig plugin (#6713)
8ac370c4a is described below

commit 8ac370c4a907516250ed08df1a73b1a8ecbd486e
Author: vernedeng <[email protected]>
AuthorDate: Sat Dec 3 22:07:33 2022 +0800

    [INLONG-6656][SDK] Support QueryConsumerConfig plugin (#6713)
---
 .../org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java    |  2 ++
 .../apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java   | 11 ++++++++++-
 .../org/apache/inlong/sdk/sort/impl/SortClientImplV2.java     |  4 +++-
 .../config/loader/ClassResourceQueryConsumeConfig.java        |  9 +++++++--
 4 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
index 1f1048641..15180b56d 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
@@ -23,4 +23,6 @@ public interface QueryConsumeConfig {
 
     ConsumeConfig queryCurrentConsumeConfig(String sortTaskId);
 
+    void configure(ClientContext context);
+
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index aaf501f60..0302ab783 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -53,7 +53,7 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
     private final Logger logger = 
LoggerFactory.getLogger(QueryConsumeConfigImpl.class);
     private final CloseableHttpClient httpClient = HttpClients.createDefault();
 
-    private final ClientContext clientContext;
+    private ClientContext clientContext;
     private String md5 = "";
 
     private Map<String, List<InLongTopic>> subscribedTopic = new HashMap<>();
@@ -62,6 +62,10 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
         this.clientContext = clientContext;
     }
 
+    public QueryConsumeConfigImpl() {
+
+    }
+
     private String getRequestUrlWithParam() {
         return clientContext.getConfig().getManagerApiUrl() + "?clusterName=" 
+ clientContext.getConfig()
                 .getSortClusterName() + "&sortTaskId=" + 
clientContext.getConfig().getSortTaskId() + "&md5=" + md5
@@ -199,4 +203,9 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
         reload();
         return new ConsumeConfig(subscribedTopic.get(sortTaskId));
     }
+
+    @Override
+    public void configure(ClientContext context) {
+        this.clientContext = context;
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
index ecd0a10c0..3a730d082 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -54,6 +54,7 @@ public class SortClientImplV2 extends SortClient {
         try {
             this.sortClientConfig = sortClientConfig;
             this.context = new ClientContextImpl(this.sortClientConfig, new 
MetricReporterImpl(sortClientConfig));
+
             this.inLongTopicManager = InlongTopicManagerFactory
                     .createInLongTopicManager(sortClientConfig.getTopicType(),
                             context, new QueryConsumeConfigImpl(context));
@@ -76,9 +77,10 @@ public class SortClientImplV2 extends SortClient {
         try {
             this.sortClientConfig = sortClientConfig;
             this.context = new ClientContextImpl(this.sortClientConfig, 
metricReporter);
+            queryConsumeConfig.configure(context);
             this.inLongTopicManager = InlongTopicManagerFactory
                     .createInLongTopicManager(sortClientConfig.getTopicType(),
-                            context, new QueryConsumeConfigImpl(context));
+                            context, queryConsumeConfig);
         } catch (Exception e) {
             e.printStackTrace();
             this.close();
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
index baadbb433..0f14708b0 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.inlong.common.pojo.sdk.CacheZone;
 import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
 import org.apache.inlong.common.pojo.sdk.Topic;
+import org.apache.inlong.sdk.sort.api.ClientContext;
 import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
@@ -38,7 +39,6 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * 
  * ClassResourceQueryConsumeConfig
  */
 public class ClassResourceQueryConsumeConfig implements QueryConsumeConfig {
@@ -47,7 +47,7 @@ public class ClassResourceQueryConsumeConfig implements 
QueryConsumeConfig {
 
     /**
      * queryCurrentConsumeConfig
-     * 
+     *
      * @param  sortTaskId
      * @return
      */
@@ -85,4 +85,9 @@ public class ClassResourceQueryConsumeConfig implements 
QueryConsumeConfig {
         return null;
     }
 
+    @Override
+    public void configure(ClientContext context) {
+        // do nothing
+    }
+
 }

Reply via email to