This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fccece5 [INLONG-1890] Inlong-Sort-Standalone add
sort-standalone-common module. (#1923)
fccece5 is described below
commit fccece5c05bc96444f887d6cca2cd9e5f85cf687
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Dec 8 15:16:18 2021 +0800
[INLONG-1890] Inlong-Sort-Standalone add sort-standalone-common module.
(#1923)
---
inlong-sort-standalone/pom.xml | 30 +++
.../sort-standalone-common/pom.xml | 33 +++
.../config/holder/CommonPropertiesHolder.java | 164 +++++++++++++++
.../config/holder/SortClusterConfigHolder.java | 154 ++++++++++++++
.../ClassResourceCommonPropertiesLoader.java | 69 +++++++
.../ClassResourceSortClusterConfigLoader.java | 67 ++++++
.../config/loader/CommonPropertiesLoader.java | 34 +++
.../loader/ManagerSortClusterConfigLoader.java | 120 +++++++++++
.../config/loader/SortClusterConfigLoader.java | 38 ++++
.../standalone/config/pojo/CacheClusterConfig.java | 68 ++++++
.../sort/standalone/config/pojo/InlongId.java | 50 +++++
.../standalone/config/pojo/SortClusterConfig.java | 68 ++++++
.../standalone/config/pojo/SortClusterRequest.java | 83 ++++++++
.../config/pojo/SortClusterResponse.java | 215 +++++++++++++++++++
.../standalone/config/pojo/SortTaskConfig.java | 230 +++++++++++++++++++++
.../standalone/config/pojo/type/CacheType.java | 70 +++++++
.../sort/standalone/config/pojo/type/DataType.java | 70 +++++++
.../sort/standalone/config/pojo/type/SortType.java | 72 +++++++
.../sort/standalone/metrics/MetricItemValue.java | 73 +++++++
.../sort/standalone/metrics/MetricListener.java | 39 ++++
.../standalone/metrics/MetricListenerRunnable.java | 132 ++++++++++++
.../sort/standalone/metrics/MetricObserver.java | 114 ++++++++++
.../sort/standalone/metrics/SortMetricItem.java | 120 +++++++++++
.../sort/standalone/metrics/SortMetricItemSet.java | 75 +++++++
.../inlong/sort/standalone/utils/BufferQueue.java | 167 +++++++++++++++
.../inlong/sort/standalone/utils/Constants.java | 35 ++++
.../sort/standalone/utils/InlongLoggerFactory.java | 85 ++++++++
.../sort/standalone/utils/SizeSemaphore.java | 126 +++++++++++
28 files changed, 2601 insertions(+)
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 09515fb..c329ff1 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -32,6 +32,7 @@
<name>Apache InLong - Sort - Standalone</name>
<modules>
+ <module>sort-standalone-common</module>
<module>sort-standalone-source</module>
<module>sort-standalone-dist</module>
</modules>
@@ -42,6 +43,12 @@
<flume.version>1.9.0</flume.version>
<plugin.assembly.version>3.2.0</plugin.assembly.version>
<pulsar.version>2.7.2</pulsar.version>
+ <junit.version>4.13</junit.version>
+ <guava.version>19.0</guava.version>
+ <skipTests>false</skipTests>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
</properties>
<dependencies>
@@ -70,6 +77,29 @@
<artifactId>inlong-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/pom.xml
b/inlong-sort-standalone/sort-standalone-common/pom.xml
new file mode 100644
index 0000000..4c99a28
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+ license agreements. See the NOTICE file distributed with this work for
additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-sort-standalone</artifactId>
+ <version>0.12.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <name>Apache InLong - SortStandalone Common</name>
+ <artifactId>sort-standalone-common</artifactId>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ </properties>
+
+ <dependencies>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
new file mode 100644
index 0000000..6006ccf
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.Context;
+import
org.apache.inlong.sort.standalone.config.loader.ClassResourceCommonPropertiesLoader;
+import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * CommonPropertiesHolder
+ */
+public class CommonPropertiesHolder {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(CommonPropertiesHolder.class);
+ public static final String KEY_COMMON_PROPERTIES =
"common-properties-loader";
+ public static final String DEFAULT_LOADER =
ClassResourceCommonPropertiesLoader.class.getName();
+ public static final String KEY_CLUSTER_ID = "clusterId";
+
+ private static Map<String, String> props;
+ private static Context context;
+
+ /**
+ * init
+ */
+ private static void init() {
+ synchronized (KEY_COMMON_PROPERTIES) {
+ if (props == null) {
+ props = new ConcurrentHashMap<>();
+ String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
+ loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER :
loaderClassName;
+ try {
+ Class<?> loaderClass =
ClassUtils.getClass(loaderClassName);
+ Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof CommonPropertiesLoader) {
+ CommonPropertiesLoader loader =
(CommonPropertiesLoader) loaderObject;
+ props.putAll(loader.load());
+ LOG.info("loaderClass:{},properties:{}",
loaderClassName, props);
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init
CommonPropertiesLoader,loaderClass:{},error:{}",
+ loaderClassName, t.getMessage());
+ LOG.error(t.getMessage(), t);
+ }
+ context = new Context(props);
+ }
+ }
+ }
+
+ /**
+ * get props
+ *
+ * @return the props
+ */
+ public static Map<String, String> get() {
+ if (props != null) {
+ return props;
+ }
+ init();
+ return props;
+ }
+
+ /**
+ * get context
+ *
+ * @return the context
+ */
+ public static Context getContext() {
+ if (context != null) {
+ return context;
+ }
+ init();
+ return context;
+ }
+
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static String getString(String key, String defaultValue) {
+ return get().getOrDefault(key, defaultValue);
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static String getString(String key) {
+ return get().get(key);
+ }
+
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static Long getLong(String key, Long defaultValue) {
+ return NumberUtils.toLong(get().get(key), defaultValue);
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static Long getLong(String key) {
+ String strValue = get().get(key);
+ Long value = (strValue == null) ? null :
NumberUtils.toLong(get().get(key));
+ return value;
+ }
+
+ /**
+ * getStringFromContext
+ *
+ * @param context
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static String getStringFromContext(Context context, String key,
String defaultValue) {
+ String value = context.getString(key);
+ value = (value != null) ? value : props.getOrDefault(key,
defaultValue);
+ return value;
+ }
+
+ /**
+ * getClusterId
+ *
+ * @return
+ */
+ public static String getClusterId() {
+ return getString(KEY_CLUSTER_ID);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
new file mode 100644
index 0000000..67b4b6a
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import static
org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader.SORT_CLUSTER_CONFIG_TYPE;
+import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import
org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * SortClusterConfigHolder
+ */
+public final class SortClusterConfigHolder {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
+
+ private static SortClusterConfigHolder instance;
+
+ private long reloadInterval;
+ private Timer reloadTimer;
+ private SortClusterConfigLoader loader;
+ private SortClusterConfig config;
+
+ /**
+ * Constructor
+ */
+ private SortClusterConfigHolder() {
+
+ }
+
+ /**
+ * getInstance
+ *
+ * @return
+ */
+ private static SortClusterConfigHolder get() {
+ if (instance != null) {
+ return instance;
+ }
+ synchronized (SortClusterConfigHolder.class) {
+ instance = new SortClusterConfigHolder();
+ instance.reloadInterval =
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
+ String loaderType =
CommonPropertiesHolder.getString(SORT_CLUSTER_CONFIG_TYPE,
+ ClassResourceSortClusterConfigLoader.class.getName());
+ try {
+ Class<?> loaderClass = ClassUtils.getClass(loaderType);
+ Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof SortClusterConfigLoader) {
+ instance.loader = (SortClusterConfigLoader) loaderObject;
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init loader,loaderType:{},error:{}",
loaderType, t.getMessage());
+ LOG.error(t.getMessage(), t);
+ }
+ if (instance.loader == null) {
+ instance.loader = new ClassResourceSortClusterConfigLoader();
+ }
+ try {
+ instance.loader.configure(new
Context(CommonPropertiesHolder.get()));
+ instance.reload();
+ instance.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ /**
+ * run
+ */
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
+ }
+
+ /**
+ * reload
+ */
+ private void reload() {
+ try {
+ SortClusterConfig newConfig = this.loader.load();
+ if (newConfig != null) {
+ this.config = newConfig;
+ }
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * getClusterConfig
+ *
+ * @return
+ */
+ public static SortClusterConfig getClusterConfig() {
+ return get().config;
+ }
+
+ /**
+ * getTaskConfig
+ *
+ * @param sortTaskName
+ * @return
+ */
+ public static SortTaskConfig getTaskConfig(String sortTaskName) {
+ SortClusterConfig config = get().config;
+ if (config != null) {
+ for (SortTaskConfig task : config.getSortTasks()) {
+ if (StringUtils.equals(sortTaskName, task.getName())) {
+ return task;
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..81d4dd0
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements
CommonPropertiesLoader {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+ public static final String FILENAME = "common.properties";
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public Map<String, String> load() {
+ return this.loadProperties(FILENAME);
+ }
+
+ /**
+ * loadProperties
+ *
+ * @param fileName
+ * @return
+ */
+ protected Map<String, String> loadProperties(String fileName) {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream =
getClass().getClassLoader().getResource(fileName).openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}",
fileName, e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}",
fileName, e);
+ }
+ return result;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
new file mode 100644
index 0000000..6193680
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.google.gson.Gson;
+
+/**
+ *
+ * ClassResourceCommonPropertiesLoader
+ */
+public class ClassResourceSortClusterConfigLoader implements
SortClusterConfigLoader {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+ public static final String FILENAME = "SortClusterConfig.conf";
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public SortClusterConfig load() {
+ try {
+ String confString =
IOUtils.toString(getClass().getClassLoader().getResource(FILENAME));
+ Gson gson = new Gson();
+ SortClusterConfig config = gson.fromJson(confString,
SortClusterConfig.class);
+ return config;
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}",
FILENAME, e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}",
FILENAME, e);
+ }
+ return new SortClusterConfig();
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
new file mode 100644
index 0000000..b0a6c50
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.util.Map;
+
+/**
+ *
+ * CommonPropertiesLoader
+ */
+public interface CommonPropertiesLoader {
+
+ /**
+ * load
+ *
+ * @return
+ */
+ Map<String, String> load();
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
new file mode 100644
index 0000000..249b5a4
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterResponse;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ *
+ * ManagerSortClusterConfigLoader
+ */
+public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader
{
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+
+ private Context context;
+ private CloseableHttpClient httpClient;
+ private Gson gson = new Gson();
+ private String md5;
+
+ /**
+ * constructHttpClient
+ *
+ * @return
+ */
+ private static synchronized CloseableHttpClient constructHttpClient() {
+ long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout((int) timeoutInMs)
+ .setSocketTimeout((int) timeoutInMs).build();
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setDefaultRequestConfig(requestConfig);
+ return httpClientBuilder.build();
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+ this.httpClient = constructHttpClient();
+ }
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public SortClusterConfig load() {
+ HttpGet httpGet = null;
+ try {
+ String clusterName =
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ String url = this.context.getString(SORT_CLUSTER_CONFIG_MANAGER) +
"?apiVersion=1.0&clusterName="
+ + clusterName + "&md5=";
+ if (StringUtils.isNotBlank(this.md5)) {
+ url += this.md5;
+ }
+ LOG.info("start to request {} to get config info", url);
+ httpGet = new HttpGet(url);
+ httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+
+ // request with get
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+ String returnStr = EntityUtils.toString(response.getEntity());
+ LOG.info("end to request {},result:{}", url, returnStr);
+ // get groupId <-> topic and m value.
+
+ SortClusterResponse clusterResponse = gson.fromJson(returnStr,
SortClusterResponse.class);
+ if (!clusterResponse.isResult()) {
+ LOG.info("Fail to get config info from url:{}, error code is
{}", url, clusterResponse.getErrCode());
+ return null;
+ }
+
+ this.md5 = clusterResponse.getMd5();
+ return clusterResponse.getData();
+ } catch (Exception ex) {
+ LOG.error("exception caught", ex);
+ return null;
+ } finally {
+ if (httpGet != null) {
+ httpGet.releaseConnection();
+ }
+ }
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
new file mode 100644
index 0000000..333c33d
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+
+/**
+ *
+ * SortClusterConfigLoader
+ */
+public interface SortClusterConfigLoader extends Configurable {
+
+ String SORT_CLUSTER_CONFIG_TYPE = "sortClusterConfig.type";
+ String SORT_CLUSTER_CONFIG_MANAGER = "sortClusterConfig.managerUrl";
+
+ /**
+ * load
+ *
+ * @return
+ */
+ SortClusterConfig load();
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
new file mode 100644
index 0000000..97b8c05
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * CacheClusterConfig
+ */
+public class CacheClusterConfig {
+
+ private String clusterName;
+ private Map<String, String> params = new HashMap<>();
+
+ /**
+ * get clusterName
+ *
+ * @return the clusterName
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * set clusterName
+ *
+ * @param clusterName the clusterName to set
+ */
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * get params
+ *
+ * @return the params
+ */
+ public Map<String, String> getParams() {
+ return params;
+ }
+
+ /**
+ * set params
+ *
+ * @param params the params to set
+ */
+ public void setParams(Map<String, String> params) {
+ this.params = params;
+ }
+
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
new file mode 100644
index 0000000..99d9aa9
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ *
+ * InlongId
+ */
+public class InlongId {
+
+ /**
+ * generateUid
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @return
+ */
+ public static String generateUid(String inlongGroupId, String
inlongStreamId) {
+ if (StringUtils.isBlank(inlongGroupId)) {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return "";
+ } else {
+ return inlongStreamId;
+ }
+ } else {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return inlongGroupId;
+ } else {
+ return inlongGroupId + "." + inlongStreamId;
+ }
+ }
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
new file mode 100644
index 0000000..a720ae0
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ * SortClusterConfig
+ */
+public class SortClusterConfig {
+
+ private String clusterName;
+ private List<SortTaskConfig> sortTasks = new ArrayList<>();
+
+ /**
+ * get clusterName
+ *
+ * @return the clusterName
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * set clusterName
+ *
+ * @param clusterName the clusterName to set
+ */
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * get sortTasks
+ *
+ * @return the sortTasks
+ */
+ public List<SortTaskConfig> getSortTasks() {
+ return sortTasks;
+ }
+
+ /**
+ * set sortTasks
+ *
+ * @param sortTasks the sortTasks to set
+ */
+ public void setSortTasks(List<SortTaskConfig> sortTasks) {
+ this.sortTasks = sortTasks;
+ }
+
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
new file mode 100644
index 0000000..19ad62b
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+/**
+ *
+ * SortClusterRequest
+ */
+public class SortClusterRequest {
+
+ private String clusterName;
+ private String md5;
+ private String apiVersion = "1.0";
+
+ /**
+ * get clusterName
+ *
+ * @return the clusterName
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * set clusterName
+ *
+ * @param clusterName the clusterName to set
+ */
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * get md5
+ *
+ * @return the md5
+ */
+ public String getMd5() {
+ return md5;
+ }
+
+ /**
+ * set md5
+ *
+ * @param md5 the md5 to set
+ */
+ public void setMd5(String md5) {
+ this.md5 = md5;
+ }
+
+ /**
+ * get apiVersion
+ *
+ * @return the apiVersion
+ */
+ public String getApiVersion() {
+ return apiVersion;
+ }
+
+ /**
+ * set apiVersion
+ *
+ * @param apiVersion the apiVersion to set
+ */
+ public void setApiVersion(String apiVersion) {
+ this.apiVersion = apiVersion;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
new file mode 100644
index 0000000..a04c24f
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+/**
+ *
+ * SortClusterResponse
+ */
+public class SortClusterResponse {
+
+ public static final int SUCC = 0;
+ public static final int NOUPDATE = 1;
+ public static final int FAIL = -1;
+ public static final int REQ_PARAMS_ERROR = -101;
+
+ private boolean result;
+ private int errCode;
+ private String md5;
+ private SortClusterConfig data;
+
+ /**
+ * get result
+ *
+ * @return the result
+ */
+ public boolean isResult() {
+ return result;
+ }
+
+ /**
+ * set result
+ *
+ * @param result the result to set
+ */
+ public void setResult(boolean result) {
+ this.result = result;
+ }
+
+ /**
+ * get errCode
+ *
+ * @return the errCode
+ */
+ public int getErrCode() {
+ return errCode;
+ }
+
+ /**
+ * set errCode
+ *
+ * @param errCode the errCode to set
+ */
+ public void setErrCode(int errCode) {
+ this.errCode = errCode;
+ }
+
+ /**
+ * get md5
+ *
+ * @return the md5
+ */
+ public String getMd5() {
+ return md5;
+ }
+
+ /**
+ * set md5
+ *
+ * @param md5 the md5 to set
+ */
+ public void setMd5(String md5) {
+ this.md5 = md5;
+ }
+
+ /**
+ * get data
+ *
+ * @return the data
+ */
+ public SortClusterConfig getData() {
+ return data;
+ }
+
+ /**
+ * set data
+ *
+ * @param data the data to set
+ */
+ public void setData(SortClusterConfig data) {
+ this.data = data;
+ }
+//
+// /**
+// * generateTdbankConfig
+// *
+// * @return
+// */
+// public static SortClusterConfig generateTdbankConfig() {
+// SortClusterConfig clusterConfig = new SortClusterConfig();
+// clusterConfig.setClusterName("tdbankv3-sz-sz1");
+// //
+// List<SortTaskConfig> sortTasks = new ArrayList<>();
+// clusterConfig.setSortTasks(sortTasks);
+// SortTaskConfig taskConfig = new SortTaskConfig();
+// sortTasks.add(taskConfig);
+// taskConfig.setName("sid_tdbank_atta6th_v3");
+// taskConfig.setType(SortType.TQTDBANK);
+// //
+// Map<String, String> sinkParams = new HashMap<>();
+// taskConfig.setSinkParams(sinkParams);
+// sinkParams.put("b_pcg_venus_szrecone_124_153_utf8",
"10.56.15.195:46801,10.56.15.212:46801,"
+// + "10.56.15.220:46801,10.56.15.221:46801,"
+// +
"10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801,"
+// +
"10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801,"
+// +
"10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801,"
+// +
"10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801,"
+// + "10.56.84.17:46801");
+// //
+// List<Map<String, String>> idParams = new ArrayList<>();
+// Map<String, String> idParam = new HashMap<>();
+// idParams.add(idParam);
+// taskConfig.setIdParams(idParams);
+// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
+// idParam.put(Constants.INLONG_STREAM_ID, "");
+// idParam.put(TdbankConfig.KEY_BID,
"b_pcg_venus_szrecone_124_153_utf8");
+// idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046");
+// idParam.put(TdbankConfig.KEY_DATA_TYPE,
TdbankConfig.DATA_TYPE_ATTA_TEXT);
+// return clusterConfig;
+// }
+//
+// /**
+// * generateCdmqConfig
+// *
+// * @return
+// */
+// public static SortClusterConfig generateCdmqConfig() {
+// SortClusterConfig clusterConfig = new SortClusterConfig();
+// clusterConfig.setClusterName("cdmqv3-sz-sz1");
+// //
+// List<SortTaskConfig> sortTasks = new ArrayList<>();
+// clusterConfig.setSortTasks(sortTasks);
+// SortTaskConfig taskConfig = new SortTaskConfig();
+// sortTasks.add(taskConfig);
+// taskConfig.setName("sid_cdmq_kg_videorequest_v3");
+// taskConfig.setType(SortType.CDMQ);
+// //
+// Map<String, String> sinkParams = new HashMap<>();
+// taskConfig.setSinkParams(sinkParams);
+// sinkParams.put("cdmqAccessPoint",
"cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033");
+// sinkParams.put("cdmqClusterId", "kg_videorequest");
+// sinkParams.put("clientId", "p_video_atta_196");
+// sinkParams.put("batchSize", "122880");
+// sinkParams.put("maxRequestSize", "8388608");
+// sinkParams.put("lingerMs", "150");
+// //
+// List<Map<String, String>> idParams = new ArrayList<>();
+// Map<String, String> idParam = new HashMap<>();
+// idParams.add(idParam);
+// taskConfig.setIdParams(idParams);
+// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
+// idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046");
+// return clusterConfig;
+// }
+//
+// /**
+// * main
+// *
+// * @param args
+// */
+// public static void main(String[] args) {
+// // tdbank
+// {
+// SortClusterConfig config = generateTdbankConfig();
+// String configString = JSON.toJSONString(config, false);
+// System.out.println("tdbank:" + configString);
+// String md5 = DigestUtils.md5Hex(configString);
+// SortClusterResponse response = new SortClusterResponse();
+// response.setResult(true);
+// response.setErrCode(SUCC);
+// response.setMd5(md5);
+// response.setData(config);
+// String responseString = JSON.toJSONString(response, true);
+// System.out.println("tdbank responseString:" + responseString);
+// }
+// // cdmq
+// {
+// SortClusterConfig config = generateCdmqConfig();
+// String configString = JSON.toJSONString(config, false);
+// System.out.println("cdmq:" + configString);
+// String md5 = DigestUtils.md5Hex(configString);
+// SortClusterResponse response = new SortClusterResponse();
+// response.setResult(true);
+// response.setErrCode(SUCC);
+// response.setMd5(md5);
+// response.setData(config);
+// String responseString = JSON.toJSONString(response, true);
+// System.out.println("cdmq responseString:" + responseString);
+// }
+// }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
new file mode 100644
index 0000000..ca99a4b
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.type.SortType;
+
+/**
+ *
+ * SortTaskConfig
+ */
+public class SortTaskConfig {
+
+ public static final String KEY_TASK_NAME = "taskName";
+ public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
+ public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
+ public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
+
+ private String name;
+ private SortType type;
+ private List<Map<String, String>> idParams = new ArrayList<>();
+ private Map<String, String> sinkParams = new HashMap<>();
+
+ /**
+ * get name
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * set name
+ *
+ * @param name the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * get type
+ *
+ * @return the type
+ */
+ public SortType getType() {
+ return type;
+ }
+
+ /**
+ * set type
+ *
+ * @param type the type to set
+ */
+ public void setType(SortType type) {
+ this.type = type;
+ }
+
+ /**
+ * get idParams
+ *
+ * @return the idParams
+ */
+ public List<Map<String, String>> getIdParams() {
+ return idParams;
+ }
+
+ /**
+ * set idParams
+ *
+ * @param idParams the idParams to set
+ */
+ public void setIdParams(List<Map<String, String>> idParams) {
+ this.idParams = idParams;
+ }
+
+ /**
+ * get sinkParams
+ *
+ * @return the sinkParams
+ */
+ public Map<String, String> getSinkParams() {
+ return sinkParams;
+ }
+
+ /**
+ * set sinkParams
+ *
+ * @param sinkParams the sinkParams to set
+ */
+ public void setSinkParams(Map<String, String> sinkParams) {
+ this.sinkParams = sinkParams;
+ }
+
+ /**
+ * getFlumeConfProperties
+ *
+ * @return Map
+ */
+ public Map<String, String> generateFlumeConfiguration() {
+ Map<String, String> flumeConf = new HashMap<>();
+ // channels
+ this.appendChannels(flumeConf);
+ // sinks
+ this.appendSinks(flumeConf);
+ // sources
+ this.appendSources(flumeConf);
+ return flumeConf;
+ }
+
+ /**
+ * appendChannels
+ *
+ * @param flumeConf
+ */
+ private void appendChannels(Map<String, String> flumeConf) {
+ StringBuilder builder = new StringBuilder();
+ String channelName = name + "Channel";
+ flumeConf.put(name + ".channels", channelName);
+ String prefix =
builder.append(name).append(".channels.").append(channelName).append(".").toString();
+ builder.setLength(0);
+ String channelType = builder.append(prefix).append("type").toString();
+ String channelClass =
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
+ flumeConf.put(channelType, channelClass);
+ this.appendCommon(flumeConf, prefix, null);
+ }
+
+ /**
+ * appendCommon
+ *
+ * @param flumeConf
+ * @param prefix
+ * @param componentParams
+ */
+ private void appendCommon(Map<String, String> flumeConf, String prefix,
Map<String, String> componentParams) {
+ StringBuilder builder = new StringBuilder();
+ String taskName =
builder.append(prefix).append(KEY_TASK_NAME).toString();
+ flumeConf.put(taskName, name);
+ // CommonProperties
+ for (Entry<String, String> entry :
CommonPropertiesHolder.get().entrySet()) {
+ builder.setLength(0);
+ String key =
builder.append(prefix).append(entry.getKey()).toString();
+ flumeConf.put(key, entry.getValue());
+ }
+ // componentParams
+ if (componentParams != null) {
+ for (Entry<String, String> entry : componentParams.entrySet()) {
+ builder.setLength(0);
+ String key =
builder.append(prefix).append(entry.getKey()).toString();
+ flumeConf.put(key, entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * appendSinks
+ *
+ * @param flumeConf
+ */
+ private void appendSinks(Map<String, String> flumeConf) {
+ // sinks
+ String sinkName = name + "Sink";
+ flumeConf.put(name + ".sinks", sinkName);
+ StringBuilder builder = new StringBuilder();
+ String prefix =
builder.append(name).append(".sinks.").append(sinkName).append(".").toString();
+ // type
+ builder.setLength(0);
+ String sinkType = builder.append(prefix).append("type").toString();
+ String sinkClass =
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
+ flumeConf.put(sinkType, sinkClass);
+ // channel
+ builder.setLength(0);
+ String channelKey =
builder.append(prefix).append("channel").toString();
+ String channelName = name + "Channel";
+ flumeConf.put(channelKey, channelName);
+ //
+ this.appendCommon(flumeConf, prefix, sinkParams);
+ }
+
+ /**
+ * appendSources
+ *
+ * @param flumeConf
+ */
+ private void appendSources(Map<String, String> flumeConf) {
+ // sources
+ String sourceName = name + "Source";
+ flumeConf.put(name + ".sources", sourceName);
+ StringBuilder builder = new StringBuilder();
+ String prefix =
builder.append(name).append(".sources.").append(sourceName).append(".").toString();
+ // type
+ builder.setLength(0);
+ String sourceType = builder.append(prefix).append("type").toString();
+ String sourceClass =
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
+ flumeConf.put(sourceType, sourceClass);
+ // channel
+ builder.setLength(0);
+ String channelKey =
builder.append(prefix).append("channels").toString();
+ String channelName = name + "Channel";
+ flumeConf.put(channelKey, channelName);
+ // selector.type
+ builder.setLength(0);
+ String selectorTypeKey =
builder.append(prefix).append("selector.type").toString();
+ flumeConf.put(selectorTypeKey,
"org.apache.flume.channel.ReplicatingChannelSelector");
+ //
+ this.appendCommon(flumeConf, prefix, null);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
new file mode 100644
index 0000000..963dc6e
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ * cache cluster type
+ */
+public enum CacheType {
+
+ TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
+
+ private final String value;
+
+ /**
+ *
+ * Constructor
+ *
+ * @param value
+ */
+ private CacheType(String value) {
+ this.value = value;
+ }
+
+ /**
+ * value
+ *
+ * @return
+ */
+ public String value() {
+ return this.value;
+ }
+
+ /**
+ * toString
+ */
+ @Override
+ public String toString() {
+ return this.name() + ":" + this.value;
+ }
+
+ /**
+ * convert
+ *
+ * @param value
+ * @return
+ */
+ public static CacheType convert(String value) {
+ for (CacheType v : values()) {
+ if (v.value().equals(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
new file mode 100644
index 0000000..882e29e
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ * data content type
+ */
+public enum DataType {
+
+ TEXT("text"), PB("pb"), JCE("jce"), N("n");
+
+ private final String value;
+
+ /**
+ *
+ * Constructor
+ *
+ * @param value
+ */
+ private DataType(String value) {
+ this.value = value;
+ }
+
+ /**
+ * value
+ *
+ * @return
+ */
+ public String value() {
+ return this.value;
+ }
+
+ /**
+ * toString
+ */
+ @Override
+ public String toString() {
+ return this.name() + ":" + this.value;
+ }
+
+ /**
+ * convert
+ *
+ * @param value
+ * @return
+ */
+ public static DataType convert(String value) {
+ for (DataType v : values()) {
+ if (v.value().equals(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
new file mode 100644
index 0000000..e066928
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ *
+ * SortType
+ */
+public enum SortType {
+
+ HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"),
ElasticSearch("ElasticSearch"), THTDBANK(
+ "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n");
+
+ private final String value;
+
+ /**
+ *
+ * Constructor
+ *
+ * @param value
+ */
+ private SortType(String value) {
+ this.value = value;
+ }
+
+ /**
+ * value
+ *
+ * @return
+ */
+ public String value() {
+ return this.value;
+ }
+
+ /**
+ * toString
+ */
+ @Override
+ public String toString() {
+ return this.name() + ":" + this.value;
+ }
+
+ /**
+ * convert
+ *
+ * @param value
+ * @return
+ */
+ public static SortType convert(String value) {
+ for (SortType v : values()) {
+ if (v.value().equals(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
new file mode 100644
index 0000000..0f55fc1
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+
+/**
+ *
+ * MetricItemValue
+ */
+public class MetricItemValue {
+
+ private final String key;
+ private final Map<String, String> dimensions;
+ private final Map<String, MetricValue> metrics;
+
+ /**
+ * Constructor
+ *
+ * @param key
+ * @param dimensions
+ * @param metrics
+ */
+ public MetricItemValue(String key, Map<String, String> dimensions,
Map<String, MetricValue> metrics) {
+ this.key = key;
+ this.dimensions = dimensions;
+ this.metrics = metrics;
+ }
+
+ /**
+ * get key
+ *
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * get dimensions
+ *
+ * @return the dimensions
+ */
+ public Map<String, String> getDimensions() {
+ return dimensions;
+ }
+
+ /**
+ * get metrics
+ *
+ * @return the metrics
+ */
+ public Map<String, MetricValue> getMetrics() {
+ return metrics;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
new file mode 100644
index 0000000..95dad6a
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.List;
+
+/**
+ *
+ * MetricListener
+ */
+public interface MetricListener {
+
+ String KEY_METRIC_DOMAINS = "metricDomains";
+ String KEY_DOMAIN_LISTENERS = "domainListeners";
+ String KEY_SNAPSHOT_INTERVAL = "snapshotInterval";
+
+ /**
+ * snapshot
+ *
+ * @param domain
+ * @param itemValues
+ */
+ public void snapshot(String domain, List<MetricItemValue> itemValues);
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
new file mode 100644
index 0000000..0cdf29f
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(MetricObserver.class);
+
+ private String domain;
+ private List<MetricListener> listenerList;
+
+ /**
+ * Constructor
+ *
+ * @param domain
+ * @param listenerList
+ */
+ public MetricListenerRunnable(String domain, List<MetricListener>
listenerList) {
+ this.domain = domain;
+ this.listenerList = listenerList;
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ LOG.info("begin to snapshot metric:{}", domain);
+ try {
+ List<MetricItemValue> itemValues = this.getItemValues();
+ LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+ this.listenerList.forEach((item) -> {
+ item.snapshot(domain, itemValues);
+ });
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ LOG.info("end to snapshot metric:{}", domain);
+ }
+
+ /**
+ * getItemValues
+ *
+ * @return MetricItemValue List
+ * @throws InstanceNotFoundException
+ * @throws AttributeNotFoundException
+ * @throws ReflectionException
+ * @throws MBeanException
+ * @throws MalformedObjectNameException
+ * @throws ClassNotFoundException
+ */
+ @SuppressWarnings("unchecked")
+ private List<MetricItemValue> getItemValues() throws
InstanceNotFoundException, AttributeNotFoundException,
+ ReflectionException, MBeanException, MalformedObjectNameException,
ClassNotFoundException {
+ ObjectName objName = new ObjectName(domain +
MetricItemMBean.DOMAIN_SEPARATOR + "*");
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+ LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
+ List<MetricItemValue> itemValues = new ArrayList<>();
+ for (ObjectInstance mbean : mbeans) {
+ String className = mbean.getClassName();
+ Class<?> clazz = ClassUtils.getClass(className);
+ if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) {
+ ObjectName metricObjectName = mbean.getObjectName();
+ String dimensionsKey = (String)
mbs.getAttribute(metricObjectName,
+ MetricItemMBean.ATTRIBUTE_KEY);
+ Map<String, String> dimensions = (Map<String, String>) mbs
+ .getAttribute(metricObjectName,
MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+ Map<String, MetricValue> metrics = (Map<String, MetricValue>)
mbs
+ .invoke(metricObjectName,
MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ MetricItemValue itemValue = new MetricItemValue(dimensionsKey,
dimensions, metrics);
+ LOG.info("MetricItemMBean get itemValue:{}", itemValue);
+ itemValues.add(itemValue);
+ } else if (ClassUtils.isAssignable(clazz,
MetricItemSetMBean.class)) {
+ ObjectName metricObjectName = mbean.getObjectName();
+ List<MetricItem> items = (List<MetricItem>)
mbs.invoke(metricObjectName,
+ MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ for (MetricItem item : items) {
+ String dimensionsKey = item.getDimensionsKey();
+ Map<String, String> dimensions = item.getDimensions();
+ Map<String, MetricValue> metrics = item.snapshot();
+ MetricItemValue itemValue = new
MetricItemValue(dimensionsKey, dimensions, metrics);
+ LOG.info("MetricItemSetMBean get itemValue:{}", itemValue);
+ itemValues.add(itemValue);
+ }
+ }
+ }
+ return itemValues;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
new file mode 100644
index 0000000..a7865a1
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * MetricObserver
+ */
+public class MetricObserver {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(MetricObserver.class);
+ private static final AtomicBoolean isInited = new AtomicBoolean(false);
+ private static ScheduledExecutorService statExecutor =
Executors.newScheduledThreadPool(5);
+
+ /**
+ * init
+ *
+ * @param commonProperties
+ */
+ public static void init(Map<String, String> commonProperties) {
+ if (!isInited.compareAndSet(false, true)) {
+ return;
+ }
+ // init
+ Context context = new Context(commonProperties);
+ // get domain name list
+ String metricDomains =
context.getString(MetricListener.KEY_METRIC_DOMAINS);
+ if (StringUtils.isBlank(metricDomains)) {
+ return;
+ }
+ // split domain name
+ String[] domains = metricDomains.split("\\s+");
+ for (String domain : domains) {
+ // get domain parameters
+ Context domainContext = new Context(
+ context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS
+ "." + domain + "."));
+ List<MetricListener> listenerList = parseDomain(domain,
domainContext);
+ // no listener
+ if (listenerList == null || listenerList.size() <= 0) {
+ continue;
+ }
+ // get snapshot interval
+ long snapshotInterval =
domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L);
+ LOG.info("begin to register
domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList,
+ snapshotInterval);
+ statExecutor.scheduleWithFixedDelay(new
MetricListenerRunnable(domain, listenerList), snapshotInterval,
+ snapshotInterval, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ /**
+ * parseDomain
+ *
+ * @param domain
+ * @param context
+ * @return
+ */
+ private static List<MetricListener> parseDomain(String domain, Context
domainContext) {
+ String listeners =
domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS);
+ if (StringUtils.isBlank(listeners)) {
+ return null;
+ }
+ String[] listenerTypes = listeners.split("\\s+");
+ List<MetricListener> listenerList = new ArrayList<>();
+ for (String listenerType : listenerTypes) {
+ // new listener object
+ try {
+ Class<?> listenerClass = ClassUtils.getClass(listenerType);
+ Object listenerObject =
listenerClass.getDeclaredConstructor().newInstance();
+ if (listenerObject == null || !(listenerObject instanceof
MetricListener)) {
+ LOG.error("{} is not instance of MetricListener.",
listenerType);
+ continue;
+ }
+ final MetricListener listener = (MetricListener)
listenerObject;
+ listenerList.add(listener);
+ } catch (Throwable t) {
+ LOG.error("Fail to init MetricListener:{},error:{}",
+ listenerType, t.getMessage());
+ continue;
+ }
+ }
+ return listenerList;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
new file mode 100644
index 0000000..6120a42
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+/**
+ *
+ * SortMetricItem
+ */
+@MetricDomain(name = "Sort")
+public class SortMetricItem extends MetricItem {
+
+ public static final String KEY_CLUSTER_ID = "clusterId";// sortClusterId
+ public static final String KEY_TASK_NAME = "taskName";// sortTaskId
+ public static final String KEY_SOURCE_ID = "sourceId";// cacheClusterId
+ public static final String KEY_SOURCE_DATA_ID = "sourceDataId";// topic
+ public static final String KEY_INLONG_GROUP_ID = "inlongGroupId";
+ public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
+ public static final String KEY_SINK_ID = "sinkId";// sortDestinationId
+ public static final String KEY_SINK_DATA_ID = "sinkDataId";// topic or
dest ip
+ //
+ public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
+ public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
+ public static final String M_READ_FAIL_COUNT = "readFailCount";
+ public static final String M_READ_FAIL_SIZE = "readFailSize";
+ public static final String M_SEND_COUNT = "sendCount";
+ public static final String M_SEND_SIZE = "sendSize";
+ public static final String M_SEND_SUCCESS_COUNT = "sendSuccessCount";
+ public static final String M_SEND_SUCCESS_SIZE = "sendSuccessSize";
+ public static final String M_SEND_FAIL_COUNT = "sendFailCount";
+ public static final String M_SEND_FAIL_SIZE = "sendFailSize";
+ //
+ public static final String M_SINK_DURATION = "sinkDuration";
+ public static final String M_NODE_DURATION = "nodeDuration";
+ public static final String M_WHOLE_DURATION = "wholeDuration";
+
+ @Dimension
+ public String clusterId;
+ @Dimension
+ public String taskName;
+ @Dimension
+ public String sourceId;
+ @Dimension
+ public String sourceDataId;
+ @Dimension
+ public String inlongGroupId;
+ @Dimension
+ public String inlongStreamId;
+ @Dimension
+ public String sinkId;
+ @Dimension
+ public String sinkDataId;
+ @CountMetric
+ public AtomicLong readSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailSize = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sinkBeginTime(milliseconds)
+ public AtomicLong sinkDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sourceReceiveTime(milliseconds)
+ public AtomicLong nodeDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - eventCreateTime(milliseconds)
+ public AtomicLong wholeDuration = new AtomicLong(0);
+
+ /**
+ * fillInlongId
+ *
+ * @param event
+ * @param dimensions
+ */
+ public static void fillInlongId(Event event, Map<String, String>
dimensions) {
+ Map<String, String> headers = event.getHeaders();
+ String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID,
"-");
+ String inlongStreamId =
headers.getOrDefault(Constants.INLONG_STREAM_ID, "-");
+ dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
new file mode 100644
index 0000000..591d962
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemSet;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * MetaSinkMetricItemSet
+ */
+@MetricDomain(name = "Sort")
+public class SortMetricItemSet extends MetricItemSet<SortMetricItem> {
+
+ public static final Logger LOG =
InlongLoggerFactory.getLogger(SortMetricItemSet.class);
+
+ /**
+ * Constructor
+ *
+ * @param name
+ */
+ public SortMetricItemSet(String name) {
+ super(name);
+ }
+
+ /**
+ * createItem
+ *
+ * @return
+ */
+ @Override
+ protected SortMetricItem createItem() {
+ return new SortMetricItem();
+ }
+
+ /**
+ * snapshot
+ *
+ * @return
+ */
+ public List<MetricItem> snapshot() {
+ List<MetricItem> snapshot = super.snapshot();
+ return snapshot;
+ }
+
+ /**
+ * getItemMap
+ *
+ * @return
+ */
+ public Map<String, SortMetricItem> getItemMap() {
+ return this.itemMap;
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
new file mode 100644
index 0000000..32a6a31
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * BufferQueue
+ */
+public class BufferQueue<A> {
+
+ private final LinkedBlockingQueue<A> queue;
+ private final SizeSemaphore currentTokens;
+ private SizeSemaphore globalTokens;
+ private final AtomicLong offerCount = new AtomicLong(0);
+ private final AtomicLong pollCount = new AtomicLong(0);
+
+ /**
+ * Constructor
+ *
+ * @param maxSizeKb
+ */
+ public BufferQueue(int maxSizeKb) {
+ this.queue = new LinkedBlockingQueue<>();
+ this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param maxSizeKb
+ * @param globalTokens
+ */
+ public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
+ this(maxSizeKb);
+ this.globalTokens = globalTokens;
+ }
+
+ /**
+ * pollRecord
+ */
+ public A pollRecord() {
+ A record = queue.poll();
+ this.pollCount.getAndIncrement();
+ return record;
+ }
+
+ /**
+ * offer
+ */
+ public void offer(A record) {
+ if (record == null) {
+ return;
+ }
+ queue.offer(record);
+ this.offerCount.incrementAndGet();
+ }
+
+ /**
+ * queue size
+ */
+ public int size() {
+ return queue.size();
+ }
+
+ /**
+ * small change
+ */
+ public int leftKb() {
+ return currentTokens.leftSemaphore();
+ }
+
+ /**
+ * availablePermits
+ */
+ public int availablePermits() {
+ return currentTokens.availablePermits();
+ }
+
+ /**
+ * maxSizeKb
+ */
+ public int maxSizeKb() {
+ return currentTokens.maxSize();
+ }
+
+ /**
+ * getIdleRate
+ */
+ public double getIdleRate() {
+ double remaining = currentTokens.availablePermits();
+ return remaining * 100.0 / currentTokens.maxSize();
+ }
+
+ /**
+ * tryAcquire
+ */
+ public boolean tryAcquire(long sizeInByte) {
+ boolean cidResult = currentTokens.tryAcquire(sizeInByte);
+ if (!cidResult) {
+ return false;
+ }
+ if (this.globalTokens == null) {
+ return true;
+ }
+ boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
+ if (globalResult) {
+ return true;
+ }
+ currentTokens.release(sizeInByte);
+ return false;
+ }
+
+ /**
+ * acquire
+ */
+ public void acquire(long sizeInByte) {
+ currentTokens.acquire(sizeInByte);
+ if (this.globalTokens != null) {
+ globalTokens.acquire(sizeInByte);
+ }
+ }
+
+ /**
+ * release
+ */
+ public void release(long sizeInByte) {
+ if (this.globalTokens != null) {
+ this.globalTokens.release(sizeInByte);
+ }
+ this.currentTokens.release(sizeInByte);
+ }
+
+ /**
+ * get offerCount
+ *
+ * @return the offerCount
+ */
+ public long getOfferCount() {
+ return offerCount.getAndSet(0);
+ }
+
+ /**
+ * get pollCount
+ *
+ * @return the pollCount
+ */
+ public long getPollCount() {
+ return pollCount.getAndSet(0);
+ }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
new file mode 100644
index 0000000..f67cb8a
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+/**
+ *
+ * Constants
+ */
+public interface Constants {
+
+ String INLONG_GROUP_ID = "inlongGroupId";
+ String INLONG_STREAM_ID = "inlongStreamId";
+ String TOPIC = "topic";
+ String HEADER_KEY_MSG_TIME = "msgTime";
+ String HEADER_KEY_SOURCE_IP = "sourceIp";
+ String HEADER_KEY_SOURCE_TIME = "sourceTime";
+ String MESSAGE_KEY = "messageKey";
+
+ String RELOAD_INTERVAL = "reloadInterval";
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
new file mode 100644
index 0000000..d4bdea7
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * InlongLoggerFactory
+ */
+public class InlongLoggerFactory {
+
+ /**
+ * getLogger
+ *
+ * @param clazz
+ * @return
+ */
+ public static Logger getLogger(Class<?> clazz) {
+ String className = clazz.getName();
+ String namePrefix = getClassNamePrefix(className, 3);
+ return LoggerFactory.getLogger(namePrefix);
+ }
+
+ /**
+ * getClassNamePrefix
+ *
+ * @param className
+ * @param layer
+ * @return
+ */
+ public static String getClassNamePrefix(String className, int layer) {
+ int index = 0;
+ for (int i = 0; i < layer; i++) {
+ int newIndex = className.indexOf('.', index + 1);
+ if (newIndex <= 0) {
+ break;
+ }
+ index = newIndex;
+ }
+ if (index == 0) {
+ return "Inlong";
+ }
+ String namePrefix = className.substring(0, index);
+ return namePrefix;
+ }
+
+// /**
+// * main
+// * @param args
+// */
+// public static void main(String[] args) {
+// int layer = 3;
+// String className = "";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "org.ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "org.apache.ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "org.apache.inlong.ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "org.apache.inlong.sort.ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// className = "org.apache.inlong.sort.standalone.ccc";
+// System.out.println(className + ":" + getClassNamePrefix(className,
layer));
+// }
+}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
new file mode 100644
index 0000000..31b5334
--- /dev/null
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SizeSemaphore
+ */
+public class SizeSemaphore {
+
+ public static final int ONEKB = 1024;
+
+ private int maxSize = 0;
+ private int leftSize = 0;
+ private Semaphore sizeSemaphore = null;
+ private AtomicInteger leftSemaphore = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param maxSize
+ * @param leftSize
+ */
+ public SizeSemaphore(int maxSize, int leftSize) {
+ this.maxSize = maxSize;
+ this.leftSize = leftSize;
+ this.sizeSemaphore = new Semaphore(maxSize, true);
+ }
+
+ /**
+ * small change
+ */
+ public int leftSemaphore() {
+ return leftSemaphore.get();
+ }
+
+ /**
+ * availablePermits
+ */
+ public int availablePermits() {
+ return sizeSemaphore.availablePermits();
+ }
+
+ /**
+ * maxSize
+ */
+ public int maxSize() {
+ return maxSize;
+ }
+
+ /**
+ * getIdleRate
+ */
+ public double getIdleRate() {
+ double remaining = sizeSemaphore.availablePermits();
+ return remaining * 100.0 / maxSize;
+ }
+
+ /**
+ * tryAcquire
+ */
+ public boolean tryAcquire(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() - sizeChange < 0) {
+ boolean result = sizeSemaphore.tryAcquire(sizeInKb + 1);
+ if (result) {
+ leftSemaphore.addAndGet(-sizeChange + leftSize);
+ }
+ return result;
+ } else {
+ boolean result = sizeSemaphore.tryAcquire(sizeInKb);
+ if (result) {
+ leftSemaphore.addAndGet(-sizeChange);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * acquire
+ */
+ public void acquire(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() - sizeChange < 0) {
+ sizeSemaphore.acquireUninterruptibly(sizeInKb + 1);
+ leftSemaphore.addAndGet(-sizeChange + leftSize);
+ } else {
+ sizeSemaphore.acquireUninterruptibly(sizeInKb);
+ leftSemaphore.addAndGet(-sizeChange);
+ }
+ }
+
+ /**
+ * release
+ */
+ public void release(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() + sizeChange > leftSize) {
+ sizeSemaphore.release(sizeInKb + 1);
+ leftSemaphore.addAndGet(sizeChange - leftSize);
+ } else {
+ sizeSemaphore.release(sizeInKb);
+ leftSemaphore.addAndGet(sizeChange);
+ }
+ }
+}