This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fccece5  [INLONG-1890] Inlong-Sort-Standalone add 
sort-standalone-common module. (#1923)
fccece5 is described below

commit fccece5c05bc96444f887d6cca2cd9e5f85cf687
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Dec 8 15:16:18 2021 +0800

    [INLONG-1890] Inlong-Sort-Standalone add sort-standalone-common module. 
(#1923)
---
 inlong-sort-standalone/pom.xml                     |  30 +++
 .../sort-standalone-common/pom.xml                 |  33 +++
 .../config/holder/CommonPropertiesHolder.java      | 164 +++++++++++++++
 .../config/holder/SortClusterConfigHolder.java     | 154 ++++++++++++++
 .../ClassResourceCommonPropertiesLoader.java       |  69 +++++++
 .../ClassResourceSortClusterConfigLoader.java      |  67 ++++++
 .../config/loader/CommonPropertiesLoader.java      |  34 +++
 .../loader/ManagerSortClusterConfigLoader.java     | 120 +++++++++++
 .../config/loader/SortClusterConfigLoader.java     |  38 ++++
 .../standalone/config/pojo/CacheClusterConfig.java |  68 ++++++
 .../sort/standalone/config/pojo/InlongId.java      |  50 +++++
 .../standalone/config/pojo/SortClusterConfig.java  |  68 ++++++
 .../standalone/config/pojo/SortClusterRequest.java |  83 ++++++++
 .../config/pojo/SortClusterResponse.java           | 215 +++++++++++++++++++
 .../standalone/config/pojo/SortTaskConfig.java     | 230 +++++++++++++++++++++
 .../standalone/config/pojo/type/CacheType.java     |  70 +++++++
 .../sort/standalone/config/pojo/type/DataType.java |  70 +++++++
 .../sort/standalone/config/pojo/type/SortType.java |  72 +++++++
 .../sort/standalone/metrics/MetricItemValue.java   |  73 +++++++
 .../sort/standalone/metrics/MetricListener.java    |  39 ++++
 .../standalone/metrics/MetricListenerRunnable.java | 132 ++++++++++++
 .../sort/standalone/metrics/MetricObserver.java    | 114 ++++++++++
 .../sort/standalone/metrics/SortMetricItem.java    | 120 +++++++++++
 .../sort/standalone/metrics/SortMetricItemSet.java |  75 +++++++
 .../inlong/sort/standalone/utils/BufferQueue.java  | 167 +++++++++++++++
 .../inlong/sort/standalone/utils/Constants.java    |  35 ++++
 .../sort/standalone/utils/InlongLoggerFactory.java |  85 ++++++++
 .../sort/standalone/utils/SizeSemaphore.java       | 126 +++++++++++
 28 files changed, 2601 insertions(+)

diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index 09515fb..c329ff1 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -32,6 +32,7 @@
     <name>Apache InLong - Sort - Standalone</name>
 
     <modules>
+        <module>sort-standalone-common</module>
         <module>sort-standalone-source</module>
         <module>sort-standalone-dist</module>
     </modules>
@@ -42,6 +43,12 @@
         <flume.version>1.9.0</flume.version>
         <plugin.assembly.version>3.2.0</plugin.assembly.version>
         <pulsar.version>2.7.2</pulsar.version>
+        <junit.version>4.13</junit.version>
+        <guava.version>19.0</guava.version>
+        <skipTests>false</skipTests>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.source>1.8</compiler.source>
+        <compiler.target>1.8</compiler.target>
     </properties>
 
     <dependencies>
@@ -70,6 +77,29 @@
             <artifactId>inlong-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>2.0.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>2.0.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/pom.xml 
b/inlong-sort-standalone/sort-standalone-common/pom.xml
new file mode 100644
index 0000000..4c99a28
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-sort-standalone</artifactId>
+        <version>0.12.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <name>Apache InLong - SortStandalone Common</name>
+    <artifactId>sort-standalone-common</artifactId>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <compiler.source>1.8</compiler.source>
+        <compiler.target>1.8</compiler.target>
+    </properties>
+
+    <dependencies>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
new file mode 100644
index 0000000..6006ccf
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.Context;
+import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceCommonPropertiesLoader;
+import org.apache.inlong.sort.standalone.config.loader.CommonPropertiesLoader;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * CommonPropertiesHolder
+ */
+public class CommonPropertiesHolder {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(CommonPropertiesHolder.class);
+    public static final String KEY_COMMON_PROPERTIES = 
"common-properties-loader";
+    public static final String DEFAULT_LOADER = 
ClassResourceCommonPropertiesLoader.class.getName();
+    public static final String KEY_CLUSTER_ID = "clusterId";
+
+    private static Map<String, String> props;
+    private static Context context;
+
+    /**
+     * init
+     */
+    private static void init() {
+        synchronized (KEY_COMMON_PROPERTIES) {
+            if (props == null) {
+                props = new ConcurrentHashMap<>();
+                String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
+                loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER : 
loaderClassName;
+                try {
+                    Class<?> loaderClass = 
ClassUtils.getClass(loaderClassName);
+                    Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
+                    if (loaderObject instanceof CommonPropertiesLoader) {
+                        CommonPropertiesLoader loader = 
(CommonPropertiesLoader) loaderObject;
+                        props.putAll(loader.load());
+                        LOG.info("loaderClass:{},properties:{}", 
loaderClassName, props);
+                    }
+                } catch (Throwable t) {
+                    LOG.error("Fail to init 
CommonPropertiesLoader,loaderClass:{},error:{}",
+                            loaderClassName, t.getMessage());
+                    LOG.error(t.getMessage(), t);
+                }
+                context = new Context(props);
+            }
+        }
+    }
+
+    /**
+     * get props
+     * 
+     * @return the props
+     */
+    public static Map<String, String> get() {
+        if (props != null) {
+            return props;
+        }
+        init();
+        return props;
+    }
+
+    /**
+     * get context
+     * 
+     * @return the context
+     */
+    public static Context getContext() {
+        if (context != null) {
+            return context;
+        }
+        init();
+        return context;
+    }
+
+    /**
+     * Gets value mapped to key, returning defaultValue if unmapped.
+     * 
+     * @param  key          to be found
+     * @param  defaultValue returned if key is unmapped
+     * @return              value associated with key
+     */
+    public static String getString(String key, String defaultValue) {
+        return get().getOrDefault(key, defaultValue);
+    }
+
+    /**
+     * Gets value mapped to key, returning null if unmapped.
+     * 
+     * @param  key to be found
+     * @return     value associated with key or null if unmapped
+     */
+    public static String getString(String key) {
+        return get().get(key);
+    }
+
+    /**
+     * Gets value mapped to key, returning defaultValue if unmapped.
+     * 
+     * @param  key          to be found
+     * @param  defaultValue returned if key is unmapped
+     * @return              value associated with key
+     */
+    public static Long getLong(String key, Long defaultValue) {
+        return NumberUtils.toLong(get().get(key), defaultValue);
+    }
+
+    /**
+     * Gets value mapped to key, returning null if unmapped.
+     * 
+     * @param  key to be found
+     * @return     value associated with key or null if unmapped
+     */
+    public static Long getLong(String key) {
+        String strValue = get().get(key);
+        Long value = (strValue == null) ? null : 
NumberUtils.toLong(get().get(key));
+        return value;
+    }
+
+    /**
+     * getStringFromContext
+     * 
+     * @param  context
+     * @param  key
+     * @param  defaultValue
+     * @return
+     */
+    public static String getStringFromContext(Context context, String key, 
String defaultValue) {
+        String value = context.getString(key);
+        value = (value != null) ? value : props.getOrDefault(key, 
defaultValue);
+        return value;
+    }
+
+    /**
+     * getClusterId
+     * 
+     * @return
+     */
+    public static String getClusterId() {
+        return getString(KEY_CLUSTER_ID);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
new file mode 100644
index 0000000..67b4b6a
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import static 
org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader.SORT_CLUSTER_CONFIG_TYPE;
+import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * SortClusterConfigHolder
+ */
+public final class SortClusterConfigHolder {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
+
+    private static SortClusterConfigHolder instance;
+
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private SortClusterConfigLoader loader;
+    private SortClusterConfig config;
+
+    /**
+     * Constructor
+     */
+    private SortClusterConfigHolder() {
+
+    }
+
+    /**
+     * getInstance
+     * 
+     * @return
+     */
+    private static SortClusterConfigHolder get() {
+        if (instance != null) {
+            return instance;
+        }
+        synchronized (SortClusterConfigHolder.class) {
+            instance = new SortClusterConfigHolder();
+            instance.reloadInterval = 
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
+            String loaderType = 
CommonPropertiesHolder.getString(SORT_CLUSTER_CONFIG_TYPE,
+                    ClassResourceSortClusterConfigLoader.class.getName());
+            try {
+                Class<?> loaderClass = ClassUtils.getClass(loaderType);
+                Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
+                if (loaderObject instanceof SortClusterConfigLoader) {
+                    instance.loader = (SortClusterConfigLoader) loaderObject;
+                }
+            } catch (Throwable t) {
+                LOG.error("Fail to init loader,loaderType:{},error:{}", 
loaderType, t.getMessage());
+                LOG.error(t.getMessage(), t);
+            }
+            if (instance.loader == null) {
+                instance.loader = new ClassResourceSortClusterConfigLoader();
+            }
+            try {
+                instance.loader.configure(new 
Context(CommonPropertiesHolder.get()));
+                instance.reload();
+                instance.setReloadTimer();
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            /**
+             * run
+             */
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
+    }
+
+    /**
+     * reload
+     */
+    private void reload() {
+        try {
+            SortClusterConfig newConfig = this.loader.load();
+            if (newConfig != null) {
+                this.config = newConfig;
+            }
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * getClusterConfig
+     * 
+     * @return
+     */
+    public static SortClusterConfig getClusterConfig() {
+        return get().config;
+    }
+
+    /**
+     * getTaskConfig
+     * 
+     * @param  sortTaskName
+     * @return
+     */
+    public static SortTaskConfig getTaskConfig(String sortTaskName) {
+        SortClusterConfig config = get().config;
+        if (config != null) {
+            for (SortTaskConfig task : config.getSortTasks()) {
+                if (StringUtils.equals(sortTaskName, task.getName())) {
+                    return task;
+                }
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..81d4dd0
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements 
CommonPropertiesLoader {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+    public static final String FILENAME = "common.properties";
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, String> load() {
+        return this.loadProperties(FILENAME);
+    }
+
+    /**
+     * loadProperties
+     * 
+     * @param  fileName
+     * @return
+     */
+    protected Map<String, String> loadProperties(String fileName) {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        try (InputStream inStream = 
getClass().getClassLoader().getResource(fileName).openStream()) {
+            Properties props = new Properties();
+            props.load(inStream);
+            for (Map.Entry<Object, Object> entry : props.entrySet()) {
+                result.put((String) entry.getKey(), (String) entry.getValue());
+            }
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", 
fileName, e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", 
fileName, e);
+        }
+        return result;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
new file mode 100644
index 0000000..6193680
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.google.gson.Gson;
+
+/**
+ * 
+ * ClassResourceCommonPropertiesLoader
+ */
+public class ClassResourceSortClusterConfigLoader implements 
SortClusterConfigLoader {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+    public static final String FILENAME = "SortClusterConfig.conf";
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public SortClusterConfig load() {
+        try {
+            String confString = 
IOUtils.toString(getClass().getClassLoader().getResource(FILENAME));
+            Gson gson = new Gson();
+            SortClusterConfig config = gson.fromJson(confString, 
SortClusterConfig.class);
+            return config;
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", 
FILENAME, e);
+        } catch (Exception e) {
+            LOG.error("fail to load properties, file ={}, and e= {}", 
FILENAME, e);
+        }
+        return new SortClusterConfig();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
new file mode 100644
index 0000000..b0a6c50
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesLoader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.util.Map;
+
+/**
+ * 
+ * CommonPropertiesLoader
+ */
+public interface CommonPropertiesLoader {
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    Map<String, String> load();
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
new file mode 100644
index 0000000..249b5a4
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterResponse;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * 
+ * ManagerSortClusterConfigLoader
+ */
+public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader 
{
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+
+    private Context context;
+    private CloseableHttpClient httpClient;
+    private Gson gson = new Gson();
+    private String md5;
+
+    /**
+     * constructHttpClient
+     * 
+     * @return
+     */
+    private static synchronized CloseableHttpClient constructHttpClient() {
+        long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout((int) timeoutInMs)
+                .setSocketTimeout((int) timeoutInMs).build();
+        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+        httpClientBuilder.setDefaultRequestConfig(requestConfig);
+        return httpClientBuilder.build();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.httpClient = constructHttpClient();
+    }
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public SortClusterConfig load() {
+        HttpGet httpGet = null;
+        try {
+            String clusterName = 
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+            String url = this.context.getString(SORT_CLUSTER_CONFIG_MANAGER) + 
"?apiVersion=1.0&clusterName="
+                    + clusterName + "&md5=";
+            if (StringUtils.isNotBlank(this.md5)) {
+                url += this.md5;
+            }
+            LOG.info("start to request {} to get config info", url);
+            httpGet = new HttpGet(url);
+            httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+
+            // request with get
+            CloseableHttpResponse response = httpClient.execute(httpGet);
+            String returnStr = EntityUtils.toString(response.getEntity());
+            LOG.info("end to request {},result:{}", url, returnStr);
+            // get groupId <-> topic and m value.
+
+            SortClusterResponse clusterResponse = gson.fromJson(returnStr, 
SortClusterResponse.class);
+            if (!clusterResponse.isResult()) {
+                LOG.info("Fail to get config info from url:{}, error code is 
{}", url, clusterResponse.getErrCode());
+                return null;
+            }
+
+            this.md5 = clusterResponse.getMd5();
+            return clusterResponse.getData();
+        } catch (Exception ex) {
+            LOG.error("exception caught", ex);
+            return null;
+        } finally {
+            if (httpGet != null) {
+                httpGet.releaseConnection();
+            }
+        }
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
new file mode 100644
index 0000000..333c33d
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+
+/**
+ * 
+ * SortClusterConfigLoader
+ */
+public interface SortClusterConfigLoader extends Configurable {
+
+    String SORT_CLUSTER_CONFIG_TYPE = "sortClusterConfig.type";
+    String SORT_CLUSTER_CONFIG_MANAGER = "sortClusterConfig.managerUrl";
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    SortClusterConfig load();
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
new file mode 100644
index 0000000..97b8c05
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 
+ * CacheClusterConfig
+ */
+public class CacheClusterConfig {
+
+    private String clusterName;
+    private Map<String, String> params = new HashMap<>();
+
+    /**
+     * get clusterName
+     * 
+     * @return the clusterName
+     */
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    /**
+     * set clusterName
+     * 
+     * @param clusterName the clusterName to set
+     */
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * get params
+     * 
+     * @return the params
+     */
+    public Map<String, String> getParams() {
+        return params;
+    }
+
+    /**
+     * set params
+     * 
+     * @param params the params to set
+     */
+    public void setParams(Map<String, String> params) {
+        this.params = params;
+    }
+
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
new file mode 100644
index 0000000..99d9aa9
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/InlongId.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * 
+ * InlongId
+ */
+public class InlongId {
+
+    /**
+     * generateUid
+     * 
+     * @param  inlongGroupId
+     * @param  inlongStreamId
+     * @return
+     */
+    public static String generateUid(String inlongGroupId, String 
inlongStreamId) {
+        if (StringUtils.isBlank(inlongGroupId)) {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return "";
+            } else {
+                return inlongStreamId;
+            }
+        } else {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return inlongGroupId;
+            } else {
+                return inlongGroupId + "." + inlongStreamId;
+            }
+        }
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
new file mode 100644
index 0000000..a720ae0
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 
+ * SortClusterConfig
+ */
+public class SortClusterConfig {
+
+    private String clusterName;
+    private List<SortTaskConfig> sortTasks = new ArrayList<>();
+
+    /**
+     * get clusterName
+     * 
+     * @return the clusterName
+     */
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    /**
+     * set clusterName
+     * 
+     * @param clusterName the clusterName to set
+     */
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * get sortTasks
+     * 
+     * @return the sortTasks
+     */
+    public List<SortTaskConfig> getSortTasks() {
+        return sortTasks;
+    }
+
+    /**
+     * set sortTasks
+     * 
+     * @param sortTasks the sortTasks to set
+     */
+    public void setSortTasks(List<SortTaskConfig> sortTasks) {
+        this.sortTasks = sortTasks;
+    }
+
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
new file mode 100644
index 0000000..19ad62b
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterRequest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+/**
+ * 
+ * SortClusterRequest
+ */
+public class SortClusterRequest {
+
+    private String clusterName;
+    private String md5;
+    private String apiVersion = "1.0";
+
+    /**
+     * get clusterName
+     * 
+     * @return the clusterName
+     */
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    /**
+     * set clusterName
+     * 
+     * @param clusterName the clusterName to set
+     */
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * get md5
+     * 
+     * @return the md5
+     */
+    public String getMd5() {
+        return md5;
+    }
+
+    /**
+     * set md5
+     * 
+     * @param md5 the md5 to set
+     */
+    public void setMd5(String md5) {
+        this.md5 = md5;
+    }
+
+    /**
+     * get apiVersion
+     * 
+     * @return the apiVersion
+     */
+    public String getApiVersion() {
+        return apiVersion;
+    }
+
+    /**
+     * set apiVersion
+     * 
+     * @param apiVersion the apiVersion to set
+     */
+    public void setApiVersion(String apiVersion) {
+        this.apiVersion = apiVersion;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
new file mode 100644
index 0000000..a04c24f
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+/**
+ * 
+ * SortClusterResponse
+ */
+public class SortClusterResponse {
+
+    public static final int SUCC = 0;
+    public static final int NOUPDATE = 1;
+    public static final int FAIL = -1;
+    public static final int REQ_PARAMS_ERROR = -101;
+
+    private boolean result;
+    private int errCode;
+    private String md5;
+    private SortClusterConfig data;
+
+    /**
+     * get result
+     * 
+     * @return the result
+     */
+    public boolean isResult() {
+        return result;
+    }
+
+    /**
+     * set result
+     * 
+     * @param result the result to set
+     */
+    public void setResult(boolean result) {
+        this.result = result;
+    }
+
+    /**
+     * get errCode
+     * 
+     * @return the errCode
+     */
+    public int getErrCode() {
+        return errCode;
+    }
+
+    /**
+     * set errCode
+     * 
+     * @param errCode the errCode to set
+     */
+    public void setErrCode(int errCode) {
+        this.errCode = errCode;
+    }
+
+    /**
+     * get md5
+     * 
+     * @return the md5
+     */
+    public String getMd5() {
+        return md5;
+    }
+
+    /**
+     * set md5
+     * 
+     * @param md5 the md5 to set
+     */
+    public void setMd5(String md5) {
+        this.md5 = md5;
+    }
+
+    /**
+     * get data
+     * 
+     * @return the data
+     */
+    public SortClusterConfig getData() {
+        return data;
+    }
+
+    /**
+     * set data
+     * 
+     * @param data the data to set
+     */
+    public void setData(SortClusterConfig data) {
+        this.data = data;
+    }
+//
+//    /**
+//     * generateTdbankConfig
+//     * 
+//     * @return
+//     */
+//    public static SortClusterConfig generateTdbankConfig() {
+//        SortClusterConfig clusterConfig = new SortClusterConfig();
+//        clusterConfig.setClusterName("tdbankv3-sz-sz1");
+//        //
+//        List<SortTaskConfig> sortTasks = new ArrayList<>();
+//        clusterConfig.setSortTasks(sortTasks);
+//        SortTaskConfig taskConfig = new SortTaskConfig();
+//        sortTasks.add(taskConfig);
+//        taskConfig.setName("sid_tdbank_atta6th_v3");
+//        taskConfig.setType(SortType.TQTDBANK);
+//        //
+//        Map<String, String> sinkParams = new HashMap<>();
+//        taskConfig.setSinkParams(sinkParams);
+//        sinkParams.put("b_pcg_venus_szrecone_124_153_utf8", 
"10.56.15.195:46801,10.56.15.212:46801,"
+//                + "10.56.15.220:46801,10.56.15.221:46801,"
+//                + 
"10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801,"
+//                + 
"10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801,"
+//                + 
"10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801,"
+//                + 
"10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801,"
+//                + "10.56.84.17:46801");
+//        //
+//        List<Map<String, String>> idParams = new ArrayList<>();
+//        Map<String, String> idParam = new HashMap<>();
+//        idParams.add(idParam);
+//        taskConfig.setIdParams(idParams);
+//        idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
+//        idParam.put(Constants.INLONG_STREAM_ID, "");
+//        idParam.put(TdbankConfig.KEY_BID, 
"b_pcg_venus_szrecone_124_153_utf8");
+//        idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046");
+//        idParam.put(TdbankConfig.KEY_DATA_TYPE, 
TdbankConfig.DATA_TYPE_ATTA_TEXT);
+//        return clusterConfig;
+//    }
+//
+//    /**
+//     * generateCdmqConfig
+//     * 
+//     * @return
+//     */
+//    public static SortClusterConfig generateCdmqConfig() {
+//        SortClusterConfig clusterConfig = new SortClusterConfig();
+//        clusterConfig.setClusterName("cdmqv3-sz-sz1");
+//        //
+//        List<SortTaskConfig> sortTasks = new ArrayList<>();
+//        clusterConfig.setSortTasks(sortTasks);
+//        SortTaskConfig taskConfig = new SortTaskConfig();
+//        sortTasks.add(taskConfig);
+//        taskConfig.setName("sid_cdmq_kg_videorequest_v3");
+//        taskConfig.setType(SortType.CDMQ);
+//        //
+//        Map<String, String> sinkParams = new HashMap<>();
+//        taskConfig.setSinkParams(sinkParams);
+//        sinkParams.put("cdmqAccessPoint", 
"cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033");
+//        sinkParams.put("cdmqClusterId", "kg_videorequest");
+//        sinkParams.put("clientId", "p_video_atta_196");
+//        sinkParams.put("batchSize", "122880");
+//        sinkParams.put("maxRequestSize", "8388608");
+//        sinkParams.put("lingerMs", "150");
+//        //
+//        List<Map<String, String>> idParams = new ArrayList<>();
+//        Map<String, String> idParam = new HashMap<>();
+//        idParams.add(idParam);
+//        taskConfig.setIdParams(idParams);
+//        idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
+//        idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046");
+//        return clusterConfig;
+//    }
+//
+//    /**
+//     * main
+//     * 
+//     * @param args
+//     */
+//    public static void main(String[] args) {
+//        // tdbank
+//        {
+//            SortClusterConfig config = generateTdbankConfig();
+//            String configString = JSON.toJSONString(config, false);
+//            System.out.println("tdbank:" + configString);
+//            String md5 = DigestUtils.md5Hex(configString);
+//            SortClusterResponse response = new SortClusterResponse();
+//            response.setResult(true);
+//            response.setErrCode(SUCC);
+//            response.setMd5(md5);
+//            response.setData(config);
+//            String responseString = JSON.toJSONString(response, true);
+//            System.out.println("tdbank responseString:" + responseString);
+//        }
+//        // cdmq
+//        {
+//            SortClusterConfig config = generateCdmqConfig();
+//            String configString = JSON.toJSONString(config, false);
+//            System.out.println("cdmq:" + configString);
+//            String md5 = DigestUtils.md5Hex(configString);
+//            SortClusterResponse response = new SortClusterResponse();
+//            response.setResult(true);
+//            response.setErrCode(SUCC);
+//            response.setMd5(md5);
+//            response.setData(config);
+//            String responseString = JSON.toJSONString(response, true);
+//            System.out.println("cdmq responseString:" + responseString);
+//        }
+//    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
new file mode 100644
index 0000000..ca99a4b
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortTaskConfig.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.type.SortType;
+
+/**
+ * 
+ * SortTaskConfig
+ */
+public class SortTaskConfig {
+
+    public static final String KEY_TASK_NAME = "taskName";
+    public static final String KEY_SORT_CHANNEL_TYPE = "sortChannel.type";
+    public static final String KEY_SORT_SINK_TYPE = "sortSink.type";
+    public static final String KEY_SORT_SOURCE_TYPE = "sortSource.type";
+
+    private String name;
+    private SortType type;
+    private List<Map<String, String>> idParams = new ArrayList<>();
+    private Map<String, String> sinkParams = new HashMap<>();
+
+    /**
+     * get name
+     * 
+     * @return the name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * set name
+     * 
+     * @param name the name to set
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * get type
+     * 
+     * @return the type
+     */
+    public SortType getType() {
+        return type;
+    }
+
+    /**
+     * set type
+     * 
+     * @param type the type to set
+     */
+    public void setType(SortType type) {
+        this.type = type;
+    }
+
+    /**
+     * get idParams
+     * 
+     * @return the idParams
+     */
+    public List<Map<String, String>> getIdParams() {
+        return idParams;
+    }
+
+    /**
+     * set idParams
+     * 
+     * @param idParams the idParams to set
+     */
+    public void setIdParams(List<Map<String, String>> idParams) {
+        this.idParams = idParams;
+    }
+
+    /**
+     * get sinkParams
+     * 
+     * @return the sinkParams
+     */
+    public Map<String, String> getSinkParams() {
+        return sinkParams;
+    }
+
+    /**
+     * set sinkParams
+     * 
+     * @param sinkParams the sinkParams to set
+     */
+    public void setSinkParams(Map<String, String> sinkParams) {
+        this.sinkParams = sinkParams;
+    }
+
+    /**
+     * getFlumeConfProperties
+     * 
+     * @return Map
+     */
+    public Map<String, String> generateFlumeConfiguration() {
+        Map<String, String> flumeConf = new HashMap<>();
+        // channels
+        this.appendChannels(flumeConf);
+        // sinks
+        this.appendSinks(flumeConf);
+        // sources
+        this.appendSources(flumeConf);
+        return flumeConf;
+    }
+
+    /**
+     * appendChannels
+     * 
+     * @param flumeConf
+     */
+    private void appendChannels(Map<String, String> flumeConf) {
+        StringBuilder builder = new StringBuilder();
+        String channelName = name + "Channel";
+        flumeConf.put(name + ".channels", channelName);
+        String prefix = 
builder.append(name).append(".channels.").append(channelName).append(".").toString();
+        builder.setLength(0);
+        String channelType = builder.append(prefix).append("type").toString();
+        String channelClass = 
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
+        flumeConf.put(channelType, channelClass);
+        this.appendCommon(flumeConf, prefix, null);
+    }
+
+    /**
+     * appendCommon
+     * 
+     * @param flumeConf
+     * @param prefix
+     * @param componentParams
+     */
+    private void appendCommon(Map<String, String> flumeConf, String prefix, 
Map<String, String> componentParams) {
+        StringBuilder builder = new StringBuilder();
+        String taskName = 
builder.append(prefix).append(KEY_TASK_NAME).toString();
+        flumeConf.put(taskName, name);
+        // CommonProperties
+        for (Entry<String, String> entry : 
CommonPropertiesHolder.get().entrySet()) {
+            builder.setLength(0);
+            String key = 
builder.append(prefix).append(entry.getKey()).toString();
+            flumeConf.put(key, entry.getValue());
+        }
+        // componentParams
+        if (componentParams != null) {
+            for (Entry<String, String> entry : componentParams.entrySet()) {
+                builder.setLength(0);
+                String key = 
builder.append(prefix).append(entry.getKey()).toString();
+                flumeConf.put(key, entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * appendSinks
+     * 
+     * @param flumeConf
+     */
+    private void appendSinks(Map<String, String> flumeConf) {
+        // sinks
+        String sinkName = name + "Sink";
+        flumeConf.put(name + ".sinks", sinkName);
+        StringBuilder builder = new StringBuilder();
+        String prefix = 
builder.append(name).append(".sinks.").append(sinkName).append(".").toString();
+        // type
+        builder.setLength(0);
+        String sinkType = builder.append(prefix).append("type").toString();
+        String sinkClass = 
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
+        flumeConf.put(sinkType, sinkClass);
+        // channel
+        builder.setLength(0);
+        String channelKey = 
builder.append(prefix).append("channel").toString();
+        String channelName = name + "Channel";
+        flumeConf.put(channelKey, channelName);
+        //
+        this.appendCommon(flumeConf, prefix, sinkParams);
+    }
+
+    /**
+     * appendSources
+     * 
+     * @param flumeConf
+     */
+    private void appendSources(Map<String, String> flumeConf) {
+        // sources
+        String sourceName = name + "Source";
+        flumeConf.put(name + ".sources", sourceName);
+        StringBuilder builder = new StringBuilder();
+        String prefix = 
builder.append(name).append(".sources.").append(sourceName).append(".").toString();
+        // type
+        builder.setLength(0);
+        String sourceType = builder.append(prefix).append("type").toString();
+        String sourceClass = 
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
+        flumeConf.put(sourceType, sourceClass);
+        // channel
+        builder.setLength(0);
+        String channelKey = 
builder.append(prefix).append("channels").toString();
+        String channelName = name + "Channel";
+        flumeConf.put(channelKey, channelName);
+        // selector.type
+        builder.setLength(0);
+        String selectorTypeKey = 
builder.append(prefix).append("selector.type").toString();
+        flumeConf.put(selectorTypeKey, 
"org.apache.flume.channel.ReplicatingChannelSelector");
+        //
+        this.appendCommon(flumeConf, prefix, null);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
new file mode 100644
index 0000000..963dc6e
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ * cache cluster type
+ */
+public enum CacheType {
+
+    TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
+
+    private final String value;
+
+    /**
+     * 
+     * Constructor
+     * 
+     * @param value
+     */
+    private CacheType(String value) {
+        this.value = value;
+    }
+
+    /**
+     * value
+     *
+     * @return
+     */
+    public String value() {
+        return this.value;
+    }
+
+    /**
+     * toString
+     */
+    @Override
+    public String toString() {
+        return this.name() + ":" + this.value;
+    }
+
+    /**
+     * convert
+     *
+     * @param  value
+     * @return
+     */
+    public static CacheType convert(String value) {
+        for (CacheType v : values()) {
+            if (v.value().equals(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
new file mode 100644
index 0000000..882e29e
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ * data content type
+ */
+public enum DataType {
+
+    TEXT("text"), PB("pb"), JCE("jce"), N("n");
+
+    private final String value;
+
+    /**
+     * 
+     * Constructor
+     * 
+     * @param value
+     */
+    private DataType(String value) {
+        this.value = value;
+    }
+
+    /**
+     * value
+     *
+     * @return
+     */
+    public String value() {
+        return this.value;
+    }
+
+    /**
+     * toString
+     */
+    @Override
+    public String toString() {
+        return this.name() + ":" + this.value;
+    }
+
+    /**
+     * convert
+     *
+     * @param  value
+     * @return
+     */
+    public static DataType convert(String value) {
+        for (DataType v : values()) {
+            if (v.value().equals(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
new file mode 100644
index 0000000..e066928
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.pojo.type;
+
+/**
+ * 
+ * SortType
+ */
+public enum SortType {
+
+    HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), 
ElasticSearch("ElasticSearch"), THTDBANK(
+            "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n");
+
+    private final String value;
+
+    /**
+     * 
+     * Constructor
+     * 
+     * @param value
+     */
+    private SortType(String value) {
+        this.value = value;
+    }
+
+    /**
+     * value
+     *
+     * @return
+     */
+    public String value() {
+        return this.value;
+    }
+
+    /**
+     * toString
+     */
+    @Override
+    public String toString() {
+        return this.name() + ":" + this.value;
+    }
+
+    /**
+     * convert
+     *
+     * @param  value
+     * @return
+     */
+    public static SortType convert(String value) {
+        for (SortType v : values()) {
+            if (v.value().equals(value)) {
+                return v;
+            }
+        }
+        return N;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
new file mode 100644
index 0000000..0f55fc1
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricItemValue.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+
+/**
+ * 
+ * MetricItemValue
+ */
+public class MetricItemValue {
+
+    private final String key;
+    private final Map<String, String> dimensions;
+    private final Map<String, MetricValue> metrics;
+
+    /**
+     * Constructor
+     * 
+     * @param key
+     * @param dimensions
+     * @param metrics
+     */
+    public MetricItemValue(String key, Map<String, String> dimensions, 
Map<String, MetricValue> metrics) {
+        this.key = key;
+        this.dimensions = dimensions;
+        this.metrics = metrics;
+    }
+
+    /**
+     * get key
+     * 
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * get dimensions
+     * 
+     * @return the dimensions
+     */
+    public Map<String, String> getDimensions() {
+        return dimensions;
+    }
+
+    /**
+     * get metrics
+     * 
+     * @return the metrics
+     */
+    public Map<String, MetricValue> getMetrics() {
+        return metrics;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
new file mode 100644
index 0000000..95dad6a
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.List;
+
+/**
+ * 
+ * MetricListener
+ */
+public interface MetricListener {
+
+    String KEY_METRIC_DOMAINS = "metricDomains";
+    String KEY_DOMAIN_LISTENERS = "domainListeners";
+    String KEY_SNAPSHOT_INTERVAL = "snapshotInterval";
+
+    /**
+     * snapshot
+     * 
+     * @param domain
+     * @param itemValues
+     */
+    public void snapshot(String domain, List<MetricItemValue> itemValues);
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
new file mode 100644
index 0000000..0cdf29f
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(MetricObserver.class);
+
+    private String domain;
+    private List<MetricListener> listenerList;
+
+    /**
+     * Constructor
+     * 
+     * @param domain
+     * @param listenerList
+     */
+    public MetricListenerRunnable(String domain, List<MetricListener> 
listenerList) {
+        this.domain = domain;
+        this.listenerList = listenerList;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info("begin to snapshot metric:{}", domain);
+        try {
+            List<MetricItemValue> itemValues = this.getItemValues();
+            LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+            this.listenerList.forEach((item) -> {
+                item.snapshot(domain, itemValues);
+            });
+        } catch (Throwable t) {
+            LOG.error(t.getMessage(), t);
+        }
+        LOG.info("end to snapshot metric:{}", domain);
+    }
+
+    /**
+     * getItemValues
+     * 
+     * @return                              MetricItemValue List
+     * @throws InstanceNotFoundException
+     * @throws AttributeNotFoundException
+     * @throws ReflectionException
+     * @throws MBeanException
+     * @throws MalformedObjectNameException
+     * @throws ClassNotFoundException
+     */
+    @SuppressWarnings("unchecked")
+    private List<MetricItemValue> getItemValues() throws 
InstanceNotFoundException, AttributeNotFoundException,
+            ReflectionException, MBeanException, MalformedObjectNameException, 
ClassNotFoundException {
+        ObjectName objName = new ObjectName(domain + 
MetricItemMBean.DOMAIN_SEPARATOR + "*");
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+        LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
+        List<MetricItemValue> itemValues = new ArrayList<>();
+        for (ObjectInstance mbean : mbeans) {
+            String className = mbean.getClassName();
+            Class<?> clazz = ClassUtils.getClass(className);
+            if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) {
+                ObjectName metricObjectName = mbean.getObjectName();
+                String dimensionsKey = (String) 
mbs.getAttribute(metricObjectName,
+                        MetricItemMBean.ATTRIBUTE_KEY);
+                Map<String, String> dimensions = (Map<String, String>) mbs
+                        .getAttribute(metricObjectName, 
MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+                Map<String, MetricValue> metrics = (Map<String, MetricValue>) 
mbs
+                        .invoke(metricObjectName, 
MetricItemMBean.METHOD_SNAPSHOT, null, null);
+                MetricItemValue itemValue = new MetricItemValue(dimensionsKey, 
dimensions, metrics);
+                LOG.info("MetricItemMBean get itemValue:{}", itemValue);
+                itemValues.add(itemValue);
+            } else if (ClassUtils.isAssignable(clazz, 
MetricItemSetMBean.class)) {
+                ObjectName metricObjectName = mbean.getObjectName();
+                List<MetricItem> items = (List<MetricItem>) 
mbs.invoke(metricObjectName,
+                        MetricItemMBean.METHOD_SNAPSHOT, null, null);
+                for (MetricItem item : items) {
+                    String dimensionsKey = item.getDimensionsKey();
+                    Map<String, String> dimensions = item.getDimensions();
+                    Map<String, MetricValue> metrics = item.snapshot();
+                    MetricItemValue itemValue = new 
MetricItemValue(dimensionsKey, dimensions, metrics);
+                    LOG.info("MetricItemSetMBean get itemValue:{}", itemValue);
+                    itemValues.add(itemValue);
+                }
+            }
+        }
+        return itemValues;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
new file mode 100644
index 0000000..a7865a1
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricObserver.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * MetricObserver
+ */
+public class MetricObserver {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(MetricObserver.class);
+    private static final AtomicBoolean isInited = new AtomicBoolean(false);
+    private static ScheduledExecutorService statExecutor = 
Executors.newScheduledThreadPool(5);
+
+    /**
+     * init
+     * 
+     * @param commonProperties
+     */
+    public static void init(Map<String, String> commonProperties) {
+        if (!isInited.compareAndSet(false, true)) {
+            return;
+        }
+        // init
+        Context context = new Context(commonProperties);
+        // get domain name list
+        String metricDomains = 
context.getString(MetricListener.KEY_METRIC_DOMAINS);
+        if (StringUtils.isBlank(metricDomains)) {
+            return;
+        }
+        // split domain name
+        String[] domains = metricDomains.split("\\s+");
+        for (String domain : domains) {
+            // get domain parameters
+            Context domainContext = new Context(
+                    context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS 
+ "." + domain + "."));
+            List<MetricListener> listenerList = parseDomain(domain, 
domainContext);
+            // no listener
+            if (listenerList == null || listenerList.size() <= 0) {
+                continue;
+            }
+            // get snapshot interval
+            long snapshotInterval = 
domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L);
+            LOG.info("begin to register 
domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList,
+                    snapshotInterval);
+            statExecutor.scheduleWithFixedDelay(new 
MetricListenerRunnable(domain, listenerList), snapshotInterval,
+                    snapshotInterval, TimeUnit.MILLISECONDS);
+        }
+
+    }
+
+    /**
+     * parseDomain
+     * 
+     * @param  domain
+     * @param  context
+     * @return
+     */
+    private static List<MetricListener> parseDomain(String domain, Context 
domainContext) {
+        String listeners = 
domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS);
+        if (StringUtils.isBlank(listeners)) {
+            return null;
+        }
+        String[] listenerTypes = listeners.split("\\s+");
+        List<MetricListener> listenerList = new ArrayList<>();
+        for (String listenerType : listenerTypes) {
+            // new listener object
+            try {
+                Class<?> listenerClass = ClassUtils.getClass(listenerType);
+                Object listenerObject = 
listenerClass.getDeclaredConstructor().newInstance();
+                if (listenerObject == null || !(listenerObject instanceof 
MetricListener)) {
+                    LOG.error("{} is not instance of MetricListener.", 
listenerType);
+                    continue;
+                }
+                final MetricListener listener = (MetricListener) 
listenerObject;
+                listenerList.add(listener);
+            } catch (Throwable t) {
+                LOG.error("Fail to init MetricListener:{},error:{}",
+                        listenerType, t.getMessage());
+                continue;
+            }
+        }
+        return listenerList;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
new file mode 100644
index 0000000..6120a42
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItem.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+/**
+ * 
+ * SortMetricItem
+ */
+@MetricDomain(name = "Sort")
+public class SortMetricItem extends MetricItem {
+
+    public static final String KEY_CLUSTER_ID = "clusterId";// sortClusterId
+    public static final String KEY_TASK_NAME = "taskName";// sortTaskId
+    public static final String KEY_SOURCE_ID = "sourceId";// cacheClusterId
+    public static final String KEY_SOURCE_DATA_ID = "sourceDataId";// topic
+    public static final String KEY_INLONG_GROUP_ID = "inlongGroupId";
+    public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
+    public static final String KEY_SINK_ID = "sinkId";// sortDestinationId
+    public static final String KEY_SINK_DATA_ID = "sinkDataId";// topic or 
dest ip
+    //
+    public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
+    public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
+    public static final String M_READ_FAIL_COUNT = "readFailCount";
+    public static final String M_READ_FAIL_SIZE = "readFailSize";
+    public static final String M_SEND_COUNT = "sendCount";
+    public static final String M_SEND_SIZE = "sendSize";
+    public static final String M_SEND_SUCCESS_COUNT = "sendSuccessCount";
+    public static final String M_SEND_SUCCESS_SIZE = "sendSuccessSize";
+    public static final String M_SEND_FAIL_COUNT = "sendFailCount";
+    public static final String M_SEND_FAIL_SIZE = "sendFailSize";
+    //
+    public static final String M_SINK_DURATION = "sinkDuration";
+    public static final String M_NODE_DURATION = "nodeDuration";
+    public static final String M_WHOLE_DURATION = "wholeDuration";
+
+    @Dimension
+    public String clusterId;
+    @Dimension
+    public String taskName;
+    @Dimension
+    public String sourceId;
+    @Dimension
+    public String sourceDataId;
+    @Dimension
+    public String inlongGroupId;
+    @Dimension
+    public String inlongStreamId;
+    @Dimension
+    public String sinkId;
+    @Dimension
+    public String sinkDataId;
+    @CountMetric
+    public AtomicLong readSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong readFailSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendSuccessSize = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailCount = new AtomicLong(0);
+    @CountMetric
+    public AtomicLong sendFailSize = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sinkBeginTime(milliseconds)
+    public AtomicLong sinkDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - sourceReceiveTime(milliseconds)
+    public AtomicLong nodeDuration = new AtomicLong(0);
+    @CountMetric
+    // sinkCallbackTime - eventCreateTime(milliseconds)
+    public AtomicLong wholeDuration = new AtomicLong(0);
+
+    /**
+     * fillInlongId
+     * 
+     * @param event
+     * @param dimensions
+     */
+    public static void fillInlongId(Event event, Map<String, String> 
dimensions) {
+        Map<String, String> headers = event.getHeaders();
+        String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, 
"-");
+        String inlongStreamId = 
headers.getOrDefault(Constants.INLONG_STREAM_ID, "-");
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
new file mode 100644
index 0000000..591d962
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/SortMetricItemSet.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemSet;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * MetaSinkMetricItemSet
+ */
+@MetricDomain(name = "Sort")
+public class SortMetricItemSet extends MetricItemSet<SortMetricItem> {
+
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortMetricItemSet.class);
+
+    /**
+     * Constructor
+     * 
+     * @param name
+     */
+    public SortMetricItemSet(String name) {
+        super(name);
+    }
+
+    /**
+     * createItem
+     * 
+     * @return
+     */
+    @Override
+    protected SortMetricItem createItem() {
+        return new SortMetricItem();
+    }
+
+    /**
+     * snapshot
+     * 
+     * @return
+     */
+    public List<MetricItem> snapshot() {
+        List<MetricItem> snapshot = super.snapshot();
+        return snapshot;
+    }
+
+    /**
+     * getItemMap
+     * 
+     * @return
+     */
+    public Map<String, SortMetricItem> getItemMap() {
+        return this.itemMap;
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
new file mode 100644
index 0000000..32a6a31
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/BufferQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * BufferQueue
+ */
+public class BufferQueue<A> {
+
+    private final LinkedBlockingQueue<A> queue;
+    private final SizeSemaphore currentTokens;
+    private SizeSemaphore globalTokens;
+    private final AtomicLong offerCount = new AtomicLong(0);
+    private final AtomicLong pollCount = new AtomicLong(0);
+
+    /**
+     * Constructor
+     * 
+     * @param maxSizeKb
+     */
+    public BufferQueue(int maxSizeKb) {
+        this.queue = new LinkedBlockingQueue<>();
+        this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param maxSizeKb
+     * @param globalTokens
+     */
+    public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
+        this(maxSizeKb);
+        this.globalTokens = globalTokens;
+    }
+
+    /**
+     * pollRecord
+     */
+    public A pollRecord() {
+        A record = queue.poll();
+        this.pollCount.getAndIncrement();
+        return record;
+    }
+
+    /**
+     * offer
+     */
+    public void offer(A record) {
+        if (record == null) {
+            return;
+        }
+        queue.offer(record);
+        this.offerCount.incrementAndGet();
+    }
+
+    /**
+     * queue size
+     */
+    public int size() {
+        return queue.size();
+    }
+
+    /**
+     * small change
+     */
+    public int leftKb() {
+        return currentTokens.leftSemaphore();
+    }
+
+    /**
+     * availablePermits
+     */
+    public int availablePermits() {
+        return currentTokens.availablePermits();
+    }
+
+    /**
+     * maxSizeKb
+     */
+    public int maxSizeKb() {
+        return currentTokens.maxSize();
+    }
+
+    /**
+     * getIdleRate
+     */
+    public double getIdleRate() {
+        double remaining = currentTokens.availablePermits();
+        return remaining * 100.0 / currentTokens.maxSize();
+    }
+
+    /**
+     * tryAcquire
+     */
+    public boolean tryAcquire(long sizeInByte) {
+        boolean cidResult = currentTokens.tryAcquire(sizeInByte);
+        if (!cidResult) {
+            return false;
+        }
+        if (this.globalTokens == null) {
+            return true;
+        }
+        boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
+        if (globalResult) {
+            return true;
+        }
+        currentTokens.release(sizeInByte);
+        return false;
+    }
+
+    /**
+     * acquire
+     */
+    public void acquire(long sizeInByte) {
+        currentTokens.acquire(sizeInByte);
+        if (this.globalTokens != null) {
+            globalTokens.acquire(sizeInByte);
+        }
+    }
+
+    /**
+     * release
+     */
+    public void release(long sizeInByte) {
+        if (this.globalTokens != null) {
+            this.globalTokens.release(sizeInByte);
+        }
+        this.currentTokens.release(sizeInByte);
+    }
+
+    /**
+     * get offerCount
+     * 
+     * @return the offerCount
+     */
+    public long getOfferCount() {
+        return offerCount.getAndSet(0);
+    }
+
+    /**
+     * get pollCount
+     * 
+     * @return the pollCount
+     */
+    public long getPollCount() {
+        return pollCount.getAndSet(0);
+    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
new file mode 100644
index 0000000..f67cb8a
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+/**
+ * 
+ * Constants
+ */
+public interface Constants {
+
+    String INLONG_GROUP_ID = "inlongGroupId";
+    String INLONG_STREAM_ID = "inlongStreamId";
+    String TOPIC = "topic";
+    String HEADER_KEY_MSG_TIME = "msgTime";
+    String HEADER_KEY_SOURCE_IP = "sourceIp";
+    String HEADER_KEY_SOURCE_TIME = "sourceTime";
+    String MESSAGE_KEY = "messageKey";
+    
+    String RELOAD_INTERVAL = "reloadInterval";
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
new file mode 100644
index 0000000..d4bdea7
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * InlongLoggerFactory
+ */
+public class InlongLoggerFactory {
+
+    /**
+     * getLogger
+     * 
+     * @param  clazz
+     * @return
+     */
+    public static Logger getLogger(Class<?> clazz) {
+        String className = clazz.getName();
+        String namePrefix = getClassNamePrefix(className, 3);
+        return LoggerFactory.getLogger(namePrefix);
+    }
+
+    /**
+     * getClassNamePrefix
+     * 
+     * @param  className
+     * @param  layer
+     * @return
+     */
+    public static String getClassNamePrefix(String className, int layer) {
+        int index = 0;
+        for (int i = 0; i < layer; i++) {
+            int newIndex = className.indexOf('.', index + 1);
+            if (newIndex <= 0) {
+                break;
+            }
+            index = newIndex;
+        }
+        if (index == 0) {
+            return "Inlong";
+        }
+        String namePrefix = className.substring(0, index);
+        return namePrefix;
+    }
+
+//    /**
+//     * main
+//     * @param args
+//     */
+//    public static void main(String[] args) {
+//        int layer = 3;
+//        String className = "";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "org.ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "org.apache.ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "org.apache.inlong.ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "org.apache.inlong.sort.ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//        className = "org.apache.inlong.sort.standalone.ccc";
+//        System.out.println(className + ":" + getClassNamePrefix(className, 
layer));
+//    }
+}
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
new file mode 100644
index 0000000..31b5334
--- /dev/null
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/SizeSemaphore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.utils;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SizeSemaphore
+ */
+public class SizeSemaphore {
+
+    public static final int ONEKB = 1024;
+
+    private int maxSize = 0;
+    private int leftSize = 0;
+    private Semaphore sizeSemaphore = null;
+    private AtomicInteger leftSemaphore = new AtomicInteger(0);
+
+    /**
+     * Constructor
+     * 
+     * @param maxSize
+     * @param leftSize
+     */
+    public SizeSemaphore(int maxSize, int leftSize) {
+        this.maxSize = maxSize;
+        this.leftSize = leftSize;
+        this.sizeSemaphore = new Semaphore(maxSize, true);
+    }
+
+    /**
+     * small change
+     */
+    public int leftSemaphore() {
+        return leftSemaphore.get();
+    }
+
+    /**
+     * availablePermits
+     */
+    public int availablePermits() {
+        return sizeSemaphore.availablePermits();
+    }
+
+    /**
+     * maxSize
+     */
+    public int maxSize() {
+        return maxSize;
+    }
+
+    /**
+     * getIdleRate
+     */
+    public double getIdleRate() {
+        double remaining = sizeSemaphore.availablePermits();
+        return remaining * 100.0 / maxSize;
+    }
+
+    /**
+     * tryAcquire
+     */
+    public boolean tryAcquire(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() - sizeChange < 0) {
+            boolean result = sizeSemaphore.tryAcquire(sizeInKb + 1);
+            if (result) {
+                leftSemaphore.addAndGet(-sizeChange + leftSize);
+            }
+            return result;
+        } else {
+            boolean result = sizeSemaphore.tryAcquire(sizeInKb);
+            if (result) {
+                leftSemaphore.addAndGet(-sizeChange);
+            }
+            return result;
+        }
+    }
+
+    /**
+     * acquire
+     */
+    public void acquire(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() - sizeChange < 0) {
+            sizeSemaphore.acquireUninterruptibly(sizeInKb + 1);
+            leftSemaphore.addAndGet(-sizeChange + leftSize);
+        } else {
+            sizeSemaphore.acquireUninterruptibly(sizeInKb);
+            leftSemaphore.addAndGet(-sizeChange);
+        }
+    }
+
+    /**
+     * release
+     */
+    public void release(long sizeInByte) {
+        int sizeInKb = (int) (sizeInByte / leftSize);
+        int sizeChange = (int) (sizeInByte % leftSize);
+        if (leftSemaphore.get() + sizeChange > leftSize) {
+            sizeSemaphore.release(sizeInKb + 1);
+            leftSemaphore.addAndGet(sizeChange - leftSize);
+        } else {
+            sizeSemaphore.release(sizeInKb);
+            leftSemaphore.addAndGet(sizeChange);
+        }
+    }
+}

Reply via email to