This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8ac370c4a [INLONG-6656][SDK] Support QueryConsumerConfig plugin (#6713)
8ac370c4a is described below
commit 8ac370c4a907516250ed08df1a73b1a8ecbd486e
Author: vernedeng <[email protected]>
AuthorDate: Sat Dec 3 22:07:33 2022 +0800
[INLONG-6656][SDK] Support QueryConsumerConfig plugin (#6713)
---
.../org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java | 2 ++
.../apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java | 11 ++++++++++-
.../org/apache/inlong/sdk/sort/impl/SortClientImplV2.java | 4 +++-
.../config/loader/ClassResourceQueryConsumeConfig.java | 9 +++++++--
4 files changed, 22 insertions(+), 4 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
index 1f1048641..15180b56d 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/QueryConsumeConfig.java
@@ -23,4 +23,6 @@ public interface QueryConsumeConfig {
ConsumeConfig queryCurrentConsumeConfig(String sortTaskId);
+ void configure(ClientContext context);
+
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index aaf501f60..0302ab783 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -53,7 +53,7 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
private final Logger logger =
LoggerFactory.getLogger(QueryConsumeConfigImpl.class);
private final CloseableHttpClient httpClient = HttpClients.createDefault();
- private final ClientContext clientContext;
+ private ClientContext clientContext;
private String md5 = "";
private Map<String, List<InLongTopic>> subscribedTopic = new HashMap<>();
@@ -62,6 +62,10 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
this.clientContext = clientContext;
}
+ public QueryConsumeConfigImpl() {
+
+ }
+
private String getRequestUrlWithParam() {
return clientContext.getConfig().getManagerApiUrl() + "?clusterName="
+ clientContext.getConfig()
.getSortClusterName() + "&sortTaskId=" +
clientContext.getConfig().getSortTaskId() + "&md5=" + md5
@@ -199,4 +203,9 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
reload();
return new ConsumeConfig(subscribedTopic.get(sortTaskId));
}
+
+ @Override
+ public void configure(ClientContext context) {
+ this.clientContext = context;
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
index ecd0a10c0..3a730d082 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -54,6 +54,7 @@ public class SortClientImplV2 extends SortClient {
try {
this.sortClientConfig = sortClientConfig;
this.context = new ClientContextImpl(this.sortClientConfig, new
MetricReporterImpl(sortClientConfig));
+
this.inLongTopicManager = InlongTopicManagerFactory
.createInLongTopicManager(sortClientConfig.getTopicType(),
context, new QueryConsumeConfigImpl(context));
@@ -76,9 +77,10 @@ public class SortClientImplV2 extends SortClient {
try {
this.sortClientConfig = sortClientConfig;
this.context = new ClientContextImpl(this.sortClientConfig,
metricReporter);
+ queryConsumeConfig.configure(context);
this.inLongTopicManager = InlongTopicManagerFactory
.createInLongTopicManager(sortClientConfig.getTopicType(),
- context, new QueryConsumeConfigImpl(context));
+ context, queryConsumeConfig);
} catch (Exception e) {
e.printStackTrace();
this.close();
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
index baadbb433..0f14708b0 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceQueryConsumeConfig.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.Topic;
+import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
@@ -38,7 +39,6 @@ import java.util.List;
import java.util.Map;
/**
- *
* ClassResourceQueryConsumeConfig
*/
public class ClassResourceQueryConsumeConfig implements QueryConsumeConfig {
@@ -47,7 +47,7 @@ public class ClassResourceQueryConsumeConfig implements
QueryConsumeConfig {
/**
* queryCurrentConsumeConfig
- *
+ *
* @param sortTaskId
* @return
*/
@@ -85,4 +85,9 @@ public class ClassResourceQueryConsumeConfig implements
QueryConsumeConfig {
return null;
}
+ @Override
+ public void configure(ClientContext context) {
+ // do nothing
+ }
+
}