This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f89062a83 [INLONG-5669][SDK] Extract SortClientConfig parameters key 
to constants (#6845)
f89062a83 is described below

commit f89062a83f5fbf6b951a50d48fe52ac58d59eb50
Author: vernedeng <[email protected]>
AuthorDate: Tue Dec 13 12:25:33 2022 +0800

    [INLONG-5669][SDK] Extract SortClientConfig parameters key to constants 
(#6845)
---
 .../inlong/sdk/sort/api/ConfigConstants.java       | 49 +++++++++++++++
 .../inlong/sdk/sort/api/SortClientConfig.java      | 71 +++++++++++++---------
 2 files changed, 90 insertions(+), 30 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
new file mode 100644
index 000000000..215892453
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.api;
+
+/**
+ * SortSdk config constants
+ */
+public class ConfigConstants {
+
+    public static final String CALLBACK_QUEUE_SIZE = "callbackQueueSize";
+    public static final String PULSAR_RECEIVE_QUEUE_SIZE = 
"pulsarReceiveQueueSize";
+    public static final String KAFKA_FETCH_WAIT_MS = "kafkaFetchWaitMs";
+    public static final String KAFKA_FETCH_SIZE_BYTES = "kafkaFetchSizeBytes";
+    public static final String KAFKA_SOCKET_RECV_BUFFER_SIZE = 
"kafkaSocketRecvBufferSize";
+    public static final String LOCAL_IP = "localIp";
+    public static final String APP_NAME = "appName";
+    public static final String SERVER_NAME = "serverName";
+    public static final String CONTAINER_ID = "containerId";
+    public static final String INSTANCE_NAME = "instanceName";
+    public static final String ENV = "env";
+    public static final String MANAGER_API_URL = "managerApiUrl";
+    public static final String MANAGER_API_VERSION = "managerApiVersion";
+    public static final String CONSUME_STRATEGY = "consumeStrategy";
+    public static final String TOPIC_MANAGER_TYPE = "topicManagerType";
+    public static final String REPORT_STATISTIC_INTERVAL_SEC = 
"reportStatisticIntervalSec";
+    public static final String UPDATE_META_DATA_INTERVAL_SEC = 
"updateMetaDataIntervalSec";
+    public static final String ACK_TIMEOUT_SEC = "ackTimeoutSec";
+    public static final String CLEAN_OLD_CONSUMER_INTERVAL_SEC = 
"cleanOldConsumerIntervalSec";
+    public static final String IS_PROMETHEUS_ENABLED = "isPrometheusEnabled";
+    public static final String EMPTY_POLL_SLEEP_STEP_MS = 
"emptyPollSleepStepMs";
+    public static final String MAX_EMPTY_POLL_SLEEP_MS = "maxEmptyPollSleepMs";
+    public static final String EMPTY_POLL_TIMES = "emptyPollTimes";
+    public static final String MAX_CONSUMER_SIZE = "maxConsumerSize";
+}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index 4997af672..6db58e9e0 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -362,42 +362,53 @@ public class SortClientConfig implements Serializable {
      * @param sortSdkParams
      */
     public void setParameters(Map<String, String> sortSdkParams) {
-        this.callbackQueueSize = 
NumberUtils.toInt(sortSdkParams.get("callbackQueueSize"), callbackQueueSize);
-        this.pulsarReceiveQueueSize = 
NumberUtils.toInt(sortSdkParams.get("pulsarReceiveQueueSize"),
+        this.callbackQueueSize =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CALLBACK_QUEUE_SIZE), 
callbackQueueSize);
+        this.pulsarReceiveQueueSize = 
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.PULSAR_RECEIVE_QUEUE_SIZE),
                 pulsarReceiveQueueSize);
-        this.kafkaFetchWaitMs = 
NumberUtils.toInt(sortSdkParams.get("kafkaFetchWaitMs"), kafkaFetchWaitMs);
-        this.kafkaFetchSizeBytes = 
NumberUtils.toInt(sortSdkParams.get("kafkaFetchSizeBytes"), 
kafkaFetchSizeBytes);
-        this.kafkaSocketRecvBufferSize = 
NumberUtils.toInt(sortSdkParams.get("kafkaSocketRecvBufferSize"),
-                kafkaSocketRecvBufferSize);
-
-        this.localIp = sortSdkParams.getOrDefault("localIp", localIp);
-        this.appName = sortSdkParams.getOrDefault("appName", appName);
-        this.serverName = sortSdkParams.getOrDefault("serverName", serverName);
-        this.containerId = sortSdkParams.getOrDefault("containerId", 
containerId);
-        this.instanceName = sortSdkParams.getOrDefault("instanceName", 
instanceName);
-        this.env = sortSdkParams.getOrDefault("env", env);
-        this.managerApiUrl = sortSdkParams.getOrDefault("managerApiUrl", 
managerApiUrl);
-        this.managerApiVersion = 
sortSdkParams.getOrDefault("managerApiVersion", managerApiVersion);
-        String strConsumeStrategy = 
sortSdkParams.getOrDefault("consumeStrategy", consumeStrategy.name());
-        String strManagerType = sortSdkParams.getOrDefault("topicManagerType",
+        this.kafkaFetchWaitMs =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_WAIT_MS), 
kafkaFetchWaitMs);
+        this.kafkaFetchSizeBytes =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_SIZE_BYTES), 
kafkaFetchSizeBytes);
+        this.kafkaSocketRecvBufferSize =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_SOCKET_RECV_BUFFER_SIZE),
+                        kafkaSocketRecvBufferSize);
+
+        this.localIp = sortSdkParams.getOrDefault(ConfigConstants.LOCAL_IP, 
localIp);
+        this.appName = sortSdkParams.getOrDefault(ConfigConstants.APP_NAME, 
appName);
+        this.serverName = 
sortSdkParams.getOrDefault(ConfigConstants.SERVER_NAME, serverName);
+        this.containerId = 
sortSdkParams.getOrDefault(ConfigConstants.CONTAINER_ID, containerId);
+        this.instanceName = 
sortSdkParams.getOrDefault(ConfigConstants.INSTANCE_NAME, instanceName);
+        this.env = sortSdkParams.getOrDefault(ConfigConstants.ENV, env);
+        this.managerApiUrl = 
sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_URL, managerApiUrl);
+        this.managerApiVersion = 
sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_VERSION, 
managerApiVersion);
+        String strConsumeStrategy =
+                sortSdkParams.getOrDefault(ConfigConstants.CONSUME_STRATEGY, 
consumeStrategy.name());
+        String strManagerType = 
sortSdkParams.getOrDefault(ConfigConstants.TOPIC_MANAGER_TYPE,
                 TopicType.MULTI_TOPIC.toString());
         this.consumeStrategy = ConsumeStrategy.valueOf(strConsumeStrategy);
         this.topicType = TopicType.valueOf(strManagerType);
 
-        this.reportStatisticIntervalSec = 
NumberUtils.toInt(sortSdkParams.get("reportStatisticIntervalSec"),
-                reportStatisticIntervalSec);
-        this.updateMetaDataIntervalSec = 
NumberUtils.toInt(sortSdkParams.get("updateMetaDataIntervalSec"),
-                updateMetaDataIntervalSec);
-        this.ackTimeoutSec = 
NumberUtils.toInt(sortSdkParams.get("ackTimeoutSec"), ackTimeoutSec);
-        this.cleanOldConsumerIntervalSec = 
NumberUtils.toInt(sortSdkParams.get("cleanOldConsumerIntervalSec"),
-                cleanOldConsumerIntervalSec);
-
-        String strPrometheusEnabled = 
sortSdkParams.getOrDefault("isPrometheusEnabled", Boolean.TRUE.toString());
+        this.reportStatisticIntervalSec =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.REPORT_STATISTIC_INTERVAL_SEC),
+                        reportStatisticIntervalSec);
+        this.updateMetaDataIntervalSec =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.UPDATE_META_DATA_INTERVAL_SEC),
+                        updateMetaDataIntervalSec);
+        this.ackTimeoutSec = 
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.ACK_TIMEOUT_SEC), 
ackTimeoutSec);
+        this.cleanOldConsumerIntervalSec =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CLEAN_OLD_CONSUMER_INTERVAL_SEC),
+                        cleanOldConsumerIntervalSec);
+
+        String strPrometheusEnabled =
+                
sortSdkParams.getOrDefault(ConfigConstants.IS_PROMETHEUS_ENABLED, 
Boolean.TRUE.toString());
         this.isPrometheusEnabled = 
StringUtils.equalsIgnoreCase(strPrometheusEnabled, Boolean.TRUE.toString());
 
-        this.emptyPollSleepStepMs = 
NumberUtils.toInt(sortSdkParams.get("emptyPollSleepStepMs"), 
emptyPollSleepStepMs);
-        this.maxEmptyPollSleepMs = 
NumberUtils.toInt(sortSdkParams.get("maxEmptyPollSleepMs"), 
maxEmptyPollSleepMs);
-        this.emptyPollTimes = 
NumberUtils.toInt(sortSdkParams.get("emptyPollTimes"), emptyPollTimes);
-        this.maxConsumerSize = 
NumberUtils.toInt(sortSdkParams.get("maxConsumerSize"), maxConsumerSize);
+        this.emptyPollSleepStepMs =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_SLEEP_STEP_MS), 
emptyPollSleepStepMs);
+        this.maxEmptyPollSleepMs =
+                
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_EMPTY_POLL_SLEEP_MS), 
maxEmptyPollSleepMs);
+        this.emptyPollTimes = 
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_TIMES), 
emptyPollTimes);
+        this.maxConsumerSize = 
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_CONSUMER_SIZE), 
maxConsumerSize);
     }
 }

Reply via email to