This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f89062a83 [INLONG-5669][SDK] Extract SortClientConfig parameters key
to constants (#6845)
f89062a83 is described below
commit f89062a83f5fbf6b951a50d48fe52ac58d59eb50
Author: vernedeng <[email protected]>
AuthorDate: Tue Dec 13 12:25:33 2022 +0800
[INLONG-5669][SDK] Extract SortClientConfig parameters key to constants
(#6845)
---
.../inlong/sdk/sort/api/ConfigConstants.java | 49 +++++++++++++++
.../inlong/sdk/sort/api/SortClientConfig.java | 71 +++++++++++++---------
2 files changed, 90 insertions(+), 30 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
new file mode 100644
index 000000000..215892453
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.api;
+
+/**
+ * SortSdk config constants
+ */
+public class ConfigConstants {
+
+ public static final String CALLBACK_QUEUE_SIZE = "callbackQueueSize";
+ public static final String PULSAR_RECEIVE_QUEUE_SIZE =
"pulsarReceiveQueueSize";
+ public static final String KAFKA_FETCH_WAIT_MS = "kafkaFetchWaitMs";
+ public static final String KAFKA_FETCH_SIZE_BYTES = "kafkaFetchSizeBytes";
+ public static final String KAFKA_SOCKET_RECV_BUFFER_SIZE =
"kafkaSocketRecvBufferSize";
+ public static final String LOCAL_IP = "localIp";
+ public static final String APP_NAME = "appName";
+ public static final String SERVER_NAME = "serverName";
+ public static final String CONTAINER_ID = "containerId";
+ public static final String INSTANCE_NAME = "instanceName";
+ public static final String ENV = "env";
+ public static final String MANAGER_API_URL = "managerApiUrl";
+ public static final String MANAGER_API_VERSION = "managerApiVersion";
+ public static final String CONSUME_STRATEGY = "consumeStrategy";
+ public static final String TOPIC_MANAGER_TYPE = "topicManagerType";
+ public static final String REPORT_STATISTIC_INTERVAL_SEC =
"reportStatisticIntervalSec";
+ public static final String UPDATE_META_DATA_INTERVAL_SEC =
"updateMetaDataIntervalSec";
+ public static final String ACK_TIMEOUT_SEC = "ackTimeoutSec";
+ public static final String CLEAN_OLD_CONSUMER_INTERVAL_SEC =
"cleanOldConsumerIntervalSec";
+ public static final String IS_PROMETHEUS_ENABLED = "isPrometheusEnabled";
+ public static final String EMPTY_POLL_SLEEP_STEP_MS =
"emptyPollSleepStepMs";
+ public static final String MAX_EMPTY_POLL_SLEEP_MS = "maxEmptyPollSleepMs";
+ public static final String EMPTY_POLL_TIMES = "emptyPollTimes";
+ public static final String MAX_CONSUMER_SIZE = "maxConsumerSize";
+}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index 4997af672..6db58e9e0 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -362,42 +362,53 @@ public class SortClientConfig implements Serializable {
* @param sortSdkParams
*/
public void setParameters(Map<String, String> sortSdkParams) {
- this.callbackQueueSize =
NumberUtils.toInt(sortSdkParams.get("callbackQueueSize"), callbackQueueSize);
- this.pulsarReceiveQueueSize =
NumberUtils.toInt(sortSdkParams.get("pulsarReceiveQueueSize"),
+ this.callbackQueueSize =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CALLBACK_QUEUE_SIZE),
callbackQueueSize);
+ this.pulsarReceiveQueueSize =
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.PULSAR_RECEIVE_QUEUE_SIZE),
pulsarReceiveQueueSize);
- this.kafkaFetchWaitMs =
NumberUtils.toInt(sortSdkParams.get("kafkaFetchWaitMs"), kafkaFetchWaitMs);
- this.kafkaFetchSizeBytes =
NumberUtils.toInt(sortSdkParams.get("kafkaFetchSizeBytes"),
kafkaFetchSizeBytes);
- this.kafkaSocketRecvBufferSize =
NumberUtils.toInt(sortSdkParams.get("kafkaSocketRecvBufferSize"),
- kafkaSocketRecvBufferSize);
-
- this.localIp = sortSdkParams.getOrDefault("localIp", localIp);
- this.appName = sortSdkParams.getOrDefault("appName", appName);
- this.serverName = sortSdkParams.getOrDefault("serverName", serverName);
- this.containerId = sortSdkParams.getOrDefault("containerId",
containerId);
- this.instanceName = sortSdkParams.getOrDefault("instanceName",
instanceName);
- this.env = sortSdkParams.getOrDefault("env", env);
- this.managerApiUrl = sortSdkParams.getOrDefault("managerApiUrl",
managerApiUrl);
- this.managerApiVersion =
sortSdkParams.getOrDefault("managerApiVersion", managerApiVersion);
- String strConsumeStrategy =
sortSdkParams.getOrDefault("consumeStrategy", consumeStrategy.name());
- String strManagerType = sortSdkParams.getOrDefault("topicManagerType",
+ this.kafkaFetchWaitMs =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_WAIT_MS),
kafkaFetchWaitMs);
+ this.kafkaFetchSizeBytes =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_SIZE_BYTES),
kafkaFetchSizeBytes);
+ this.kafkaSocketRecvBufferSize =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_SOCKET_RECV_BUFFER_SIZE),
+ kafkaSocketRecvBufferSize);
+
+ this.localIp = sortSdkParams.getOrDefault(ConfigConstants.LOCAL_IP,
localIp);
+ this.appName = sortSdkParams.getOrDefault(ConfigConstants.APP_NAME,
appName);
+ this.serverName =
sortSdkParams.getOrDefault(ConfigConstants.SERVER_NAME, serverName);
+ this.containerId =
sortSdkParams.getOrDefault(ConfigConstants.CONTAINER_ID, containerId);
+ this.instanceName =
sortSdkParams.getOrDefault(ConfigConstants.INSTANCE_NAME, instanceName);
+ this.env = sortSdkParams.getOrDefault(ConfigConstants.ENV, env);
+ this.managerApiUrl =
sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_URL, managerApiUrl);
+ this.managerApiVersion =
sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_VERSION,
managerApiVersion);
+ String strConsumeStrategy =
+ sortSdkParams.getOrDefault(ConfigConstants.CONSUME_STRATEGY,
consumeStrategy.name());
+ String strManagerType =
sortSdkParams.getOrDefault(ConfigConstants.TOPIC_MANAGER_TYPE,
TopicType.MULTI_TOPIC.toString());
this.consumeStrategy = ConsumeStrategy.valueOf(strConsumeStrategy);
this.topicType = TopicType.valueOf(strManagerType);
- this.reportStatisticIntervalSec =
NumberUtils.toInt(sortSdkParams.get("reportStatisticIntervalSec"),
- reportStatisticIntervalSec);
- this.updateMetaDataIntervalSec =
NumberUtils.toInt(sortSdkParams.get("updateMetaDataIntervalSec"),
- updateMetaDataIntervalSec);
- this.ackTimeoutSec =
NumberUtils.toInt(sortSdkParams.get("ackTimeoutSec"), ackTimeoutSec);
- this.cleanOldConsumerIntervalSec =
NumberUtils.toInt(sortSdkParams.get("cleanOldConsumerIntervalSec"),
- cleanOldConsumerIntervalSec);
-
- String strPrometheusEnabled =
sortSdkParams.getOrDefault("isPrometheusEnabled", Boolean.TRUE.toString());
+ this.reportStatisticIntervalSec =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.REPORT_STATISTIC_INTERVAL_SEC),
+ reportStatisticIntervalSec);
+ this.updateMetaDataIntervalSec =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.UPDATE_META_DATA_INTERVAL_SEC),
+ updateMetaDataIntervalSec);
+ this.ackTimeoutSec =
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.ACK_TIMEOUT_SEC),
ackTimeoutSec);
+ this.cleanOldConsumerIntervalSec =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CLEAN_OLD_CONSUMER_INTERVAL_SEC),
+ cleanOldConsumerIntervalSec);
+
+ String strPrometheusEnabled =
+
sortSdkParams.getOrDefault(ConfigConstants.IS_PROMETHEUS_ENABLED,
Boolean.TRUE.toString());
this.isPrometheusEnabled =
StringUtils.equalsIgnoreCase(strPrometheusEnabled, Boolean.TRUE.toString());
- this.emptyPollSleepStepMs =
NumberUtils.toInt(sortSdkParams.get("emptyPollSleepStepMs"),
emptyPollSleepStepMs);
- this.maxEmptyPollSleepMs =
NumberUtils.toInt(sortSdkParams.get("maxEmptyPollSleepMs"),
maxEmptyPollSleepMs);
- this.emptyPollTimes =
NumberUtils.toInt(sortSdkParams.get("emptyPollTimes"), emptyPollTimes);
- this.maxConsumerSize =
NumberUtils.toInt(sortSdkParams.get("maxConsumerSize"), maxConsumerSize);
+ this.emptyPollSleepStepMs =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_SLEEP_STEP_MS),
emptyPollSleepStepMs);
+ this.maxEmptyPollSleepMs =
+
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_EMPTY_POLL_SLEEP_MS),
maxEmptyPollSleepMs);
+ this.emptyPollTimes =
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_TIMES),
emptyPollTimes);
+ this.maxConsumerSize =
NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_CONSUMER_SIZE),
maxConsumerSize);
}
}