This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6da93a43d1 [INLONG-10410][Agent] Add ZK plugin to save offset info 
(#10411)
6da93a43d1 is described below

commit 6da93a43d1ddbc8104ce107456b1902b3031b96b
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jun 14 09:53:39 2024 +0800

    [INLONG-10410][Agent] Add ZK plugin to save offset info (#10411)
    
    * [INLONG-10410][Agent] Add ZK plugin to save offset info
    
    * [INLONG-10410][Agent] Add ZK plugin to save offset info
    
    * Update 
inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
    
    Co-authored-by: AloysZhang <[email protected]>
    
    * Update 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZKImp.java
    
    Co-authored-by: Charles Zhang <[email protected]>
    
    * [INLONG-10410][Agent] Add ZK plugin to save offset info
    
    * [INLONG-10410][Agent] Add ZK plugin to save offset info
    
    ---------
    
    Co-authored-by: AloysZhang <[email protected]>
    Co-authored-by: Charles Zhang <[email protected]>
---
 .../inlong/agent/constant/AgentConstants.java      |   9 +-
 .../inlong/agent/constant/CommonConstants.java     |   6 +-
 .../{RocksStoreImp.java => RocksDBStoreImpl.java}  |   8 +-
 ...ocksStoreImp.java => TestRocksDBStoreImpl.java} |   8 +-
 .../org/apache/inlong/agent/core/AgentManager.java |  12 +-
 .../apache/inlong/agent/core/task/TaskManager.java |  17 +-
 .../agent/core/instance/TestInstanceManager.java   |   6 +-
 .../inlong/agent/core/store/TestRocksDbKey.java    |  69 ++++++++
 .../inlong/agent/plugin/store/ZooKeeperImpl.java   | 185 +++++++++++++++++++++
 .../inlong/agent/plugin/utils/RocksDBUtils.java    |   4 +-
 .../agent/plugin/sources/TestLogFileSource.java    |   6 +-
 .../inlong/agent/plugin/store/TestStoreKey.java    |  76 +++++++++
 12 files changed, 368 insertions(+), 38 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 0d67e55f82..36d7272ddc 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -31,9 +31,9 @@ public class AgentConstants {
     public static final String DEFAULT_AGENT_HOME = 
System.getProperty("agent.home");
     public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
     public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
-    public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task";
-    public static final String AGENT_LOCAL_DB_PATH_INSTANCE = 
".localdb/instance";
-    public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset";
+    public static final String AGENT_STORE_PATH_TASK = ".localdb/task";
+    public static final String AGENT_STORE_PATH_INSTANCE = ".localdb/instance";
+    public static final String AGENT_STORE_PATH_OFFSET = ".localdb/offset";
     public static final String AGENT_UNIQ_ID = "agent.uniq.id";
     // default is empty.
     public static final String AGENT_FETCHER_CLASSNAME = 
"agent.fetcher.classname";
@@ -46,6 +46,9 @@ public class AgentConstants {
     public static final String AGENT_LOCAL_IP = "agent.local.ip";
     public static final String DEFAULT_LOCAL_IP = "127.0.0.1";
     public static final String DEFAULT_LOCAL_HOST = "localhost";
+    public static final String AGENT_STORE_CLASSNAME = "agent.store.classname";
+    public static final String DEFAULT_AGENT_STORE_CLASSNAME = 
"org.apache.inlong.agent.store.RocksDBStoreImpl";
+
     // default use local ip as uniq id for agent.
     public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIp();
     public static final String CUSTOM_FIXED_IP = "agent.custom.fixed.ip";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index a92b3222ad..45320406ef 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -89,9 +89,9 @@ public class CommonConstants {
     public static final String PROXY_KEY_DATA = "dataKey";
 
     public static final int DEFAULT_FILE_MAX_NUM = 4096;
-    public static final String TASK_ID_PREFIX = "task_";
-    public static final String INSTANCE_ID_PREFIX = "ins_";
-    public static final String OFFSET_ID_PREFIX = "offset_";
+    public static final String TASK_ID_PREFIX = "task";
+    public static final String INSTANCE_ID_PREFIX = "ins";
+    public static final String OFFSET_ID_PREFIX = "offset";
     public static final String AGENT_OS_NAME = "os.name";
     public static final String AGENT_NIX_OS = "nix";
     public static final String AGENT_NUX_OS = "nux";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
similarity index 97%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
index 47caee3703..b9be413f04 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
@@ -47,9 +47,9 @@ import static java.util.Objects.requireNonNull;
 /**
  * Store implement based on the Rocks DB.
  */
-public class RocksStoreImp implements Store {
+public class RocksDBStoreImpl implements Store {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(RocksStoreImp.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RocksDBStoreImpl.class);
     private static final Gson GSON = new Gson();
     public static final String SPLITTER = "_";
     public static final String UNIQUE_KEY = "";
@@ -62,7 +62,7 @@ public class RocksStoreImp implements Store {
     private ConcurrentHashMap<String, ColumnFamilyDescriptor> 
columnDescriptorMap;
     private String storePath;
 
-    public RocksStoreImp(String childPath) {
+    public RocksDBStoreImpl(String childPath) {
         // init rocks db
         this.conf = AgentConfiguration.getAgentConf();
         this.db = initEnv(childPath);
@@ -126,7 +126,7 @@ public class RocksStoreImp implements Store {
         } else {
             LOGGER.info("loading column families :" + 
existing.stream().map(String::new).collect(Collectors.toList()));
             managedColumnFamilies.addAll(
-                    
existing.stream().map(RocksStoreImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+                    
existing.stream().map(RocksDBStoreImpl::getColumnFamilyDescriptor).collect(Collectors.toList()));
         }
         return managedColumnFamilies;
     }
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
similarity index 91%
rename from 
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
rename to 
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
index e9dd639b0a..5d1c291815 100644
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
@@ -26,15 +26,15 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-public class TestRocksStoreImp {
+public class TestRocksDBStoreImpl {
 
-    private static RocksStoreImp store;
+    private static RocksDBStoreImpl store;
     private static AgentBaseTestsHelper helper;
 
     @BeforeClass
     public static void setup() throws Exception {
-        helper = new 
AgentBaseTestsHelper(TestRocksStoreImp.class.getName()).setupAgentHome();
-        store = new RocksStoreImp("/localdb");
+        helper = new 
AgentBaseTestsHelper(TestRocksDBStoreImpl.class.getName()).setupAgentHome();
+        store = new RocksDBStoreImpl("/localdb");
     }
 
     @AfterClass
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 6e8deba0ae..b00eebf869 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -29,10 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Agent Manager, the bridge for task manager, task store e.t.c it manages 
agent level operations and communicates
@@ -46,8 +44,7 @@ public class AgentManager extends AbstractDaemon {
     private final ProfileFetcher fetcher;
     private final AgentConfiguration conf;
     private final ExecutorService agentConfMonitor;
-    public static final int CONFIG_QUEUE_CAPACITY = 2;
-    private static BlockingQueue<AgentConfigInfo> configQueue = new 
LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
+    private static AgentConfigInfo agentConfigInfo;
 
     public AgentManager() {
         conf = AgentConfiguration.getAgentConf();
@@ -58,17 +55,14 @@ public class AgentManager extends AbstractDaemon {
     }
 
     public static AgentConfigInfo getAgentConfigInfo() {
-        return configQueue.peek();
+        return agentConfigInfo;
     }
 
     public void subNewAgentConfigInfo(AgentConfigInfo info) {
         if (info == null) {
             return;
         }
-        if (configQueue.size() == CONFIG_QUEUE_CAPACITY) {
-            configQueue.poll();
-        }
-        configQueue.add(info);
+        agentConfigInfo = info;
     }
 
     /**
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 5bbdf43432..c1b7da07f3 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.store.RocksStoreImp;
 import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -34,6 +33,7 @@ import org.apache.inlong.common.enums.TaskStateEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -77,7 +77,7 @@ public class TaskManager extends AbstractDaemon {
     // tasks which are not accepted by running pool.
     private final BlockingQueue<Task> pendingTasks;
     private final int taskMaxLimit;
-    private final AgentConfiguration agentConf;
+    private static final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
     // instance profile queue.
     private final BlockingQueue<TaskAction> actionQueue;
 
@@ -124,14 +124,13 @@ public class TaskManager extends AbstractDaemon {
      * Init task manager.
      */
     public TaskManager() {
-        this.agentConf = AgentConfiguration.getAgentConf();
         taskBasicStore = initStore(
-                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_STORE_PATH_TASK));
         taskStore = new TaskStore(taskBasicStore);
         instanceBasicStore = initStore(
-                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_STORE_PATH_INSTANCE));
         offsetBasicStore =
-                initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+                initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_STORE_PATH_OFFSET));
         OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
         this.runningPool = new ThreadPoolExecutor(
                 0, Integer.MAX_VALUE,
@@ -160,7 +159,11 @@ public class TaskManager extends AbstractDaemon {
      */
     public static Store initStore(String childPath) {
         try {
-            return new RocksStoreImp(childPath);
+            Constructor<?> constructor =
+                    
Class.forName(agentConf.get(AgentConstants.AGENT_STORE_CLASSNAME,
+                            
AgentConstants.DEFAULT_AGENT_STORE_CLASSNAME)).getDeclaredConstructor(String.class);
+            constructor.setAccessible(true);
+            return (Store) constructor.newInstance(childPath);
         } catch (Exception ex) {
             throw new UnsupportedClassVersionError(ex.getMessage());
         }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index f45e4c55e0..13c52bd505 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -54,12 +54,12 @@ public class TestInstanceManager {
     public static void setup() {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
-        Store basicStore = TaskManager.initStore("/localdb");
+        Store basicInstanceStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
         taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING, "GMT+6:00");
-        Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+        Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
         TaskStore taskStore = new TaskStore(taskBasicStore);
         taskStore.storeTask(taskProfile);
-        manager = new InstanceManager("1", 20, basicStore, taskStore);
+        manager = new InstanceManager("1", 20, basicInstanceStore, taskStore);
         manager.CORE_THREAD_SLEEP_TIME_MS = 100;
     }
 
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
new file mode 100644
index 0000000000..b854296872
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.store;
+
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.AgentBaseTestsHelper;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.TaskStore;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TestRocksDbKey {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRocksDbKey.class);
+    private static TaskStore taskStore;
+    private static InstanceStore instanceStore;
+    private static OffsetStore offsetStore;
+
+    private static AgentBaseTestsHelper helper;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        helper = new 
AgentBaseTestsHelper(TestRocksDbKey.class.getName()).setupAgentHome();
+        taskStore = new 
TaskStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK));
+        instanceStore = new 
InstanceStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE));
+        offsetStore = new 
OffsetStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET));
+
+    }
+
+    @AfterClass
+    public static void teardown() throws IOException {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testStore() {
+        Assert.assertEquals(0, taskStore.getKey().compareTo("task"));
+        Assert.assertEquals(0, 
taskStore.getKeyByTaskId("1").compareTo("task_1"));
+        Assert.assertEquals(0, instanceStore.getKey().compareTo("ins_"));
+        Assert.assertEquals(0, 
instanceStore.getKeyByTaskId("1").compareTo("ins_1"));
+        Assert.assertEquals(0, instanceStore.getKeyByTaskAndInstanceId("1", 
"/data/log/123.log")
+                .compareTo("ins_1_/data/log/123.log"));
+        Assert.assertEquals(0, offsetStore.getKey("1", 
"/data/log/123.log").compareTo("offset_1_/data/log/123.log"));
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
new file mode 100755
index 0000000000..b0df128e5b
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.store;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.core.AgentManager;
+import org.apache.inlong.agent.store.KeyValueEntity;
+import org.apache.inlong.agent.store.Store;
+
+import com.google.gson.Gson;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+
+/**
+ * Store implement based on the ZooKeeper.
+ */
+public class ZooKeeperImpl implements Store {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ZooKeeperImpl.class);
+    private static final Gson GSON = new Gson();
+    public static final int SLEEP_MS_BETWEEN_RETRIES = 1000 * 10;
+    public static final int MAX_RETRIES = 10;
+    public static final int SESSION_TIMEOUT_MS = 1000 * 60 * 5;
+    public static final int CONNECTION_TIMEOUT_MS = 1000 * 10;
+    public static final String SPLITTER = "/";
+    private static CuratorFramework client;
+    private String uniqueKey;
+    private static final AgentConfiguration conf = 
AgentConfiguration.getAgentConf();
+    private static final String ZK_PRE = "/agent";
+
+    public ZooKeeperImpl(String childPath) {
+        uniqueKey = ZK_PRE + getSplitter() + conf.get(AGENT_CLUSTER_TAG) + 
getSplitter() + conf.get(AGENT_CLUSTER_NAME)
+                + getSplitter() + conf.get(AGENT_LOCAL_IP) + getSplitter() + 
childPath;
+    }
+
+    /**
+     * get client
+     *
+     * @return zookeeper client
+     */
+    private static CuratorFramework getClient() {
+        if (client == null) {
+            synchronized (ZooKeeperImpl.class) {
+                if (client == null) {
+                    if (AgentManager.getAgentConfigInfo() == null) {
+                        throw new RuntimeException("agent config is null");
+                    }
+                    RetryPolicy retryPolicy = new 
RetryNTimes(SLEEP_MS_BETWEEN_RETRIES, MAX_RETRIES);
+                    client = 
CuratorFrameworkFactory.newClient(AgentManager.getAgentConfigInfo().getZkUrl(),
+                            SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, 
retryPolicy);
+                    client.start();
+                }
+            }
+        }
+        return client;
+    }
+
+    @Override
+    public KeyValueEntity get(String key) {
+        try {
+            byte[] bytes = getClient().getData().forPath(key);
+            return bytes == null ? null : GSON.fromJson(new String(bytes), 
KeyValueEntity.class);
+        } catch (NoNodeException e) {
+            return null;
+        } catch (Exception e) {
+            throw new RuntimeException("get key value entity error", e);
+        }
+    }
+
+    @Override
+    public void put(KeyValueEntity entity) {
+        Stat stat;
+        try {
+            byte[] data = GSON.toJson(entity).getBytes(StandardCharsets.UTF_8);
+            stat = getClient().checkExists().forPath(entity.getKey());
+            if (stat == null) {
+                getClient().create().creatingParentsIfNeeded()
+                        .forPath(entity.getKey(), data);
+            } else {
+                getClient().setData().forPath(entity.getKey(), data);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Path {}, put has exception",
+                    entity.getKey(), e);
+        }
+    }
+
+    @Override
+    public KeyValueEntity remove(String key) {
+        Stat stat;
+        try {
+            stat = getClient().checkExists().forPath(key);
+            if (stat != null) {
+                getClient().delete().forPath(key);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Path {}, remove has exception", key, e);
+        }
+        return null;
+    }
+
+    @Override
+    public List<KeyValueEntity> findAll(String prefix) {
+        List<KeyValueEntity> result = new ArrayList<>();
+        List<String> nodes = getLeafNodes(prefix);
+        for (int i = 0; i < nodes.size(); i++) {
+            result.add(get(nodes.get(i)));
+        }
+        return result;
+    }
+
+    private List<String> getLeafNodes(String path) {
+        List<String> leafNodes = new ArrayList<>();
+        try {
+            List<String> children = getClient().getChildren().forPath(path);
+            if (children.isEmpty()) {
+                leafNodes.add(path);
+            } else {
+                for (int i = 0; i < children.size(); i++) {
+                    leafNodes.addAll(getLeafNodes(path + getSplitter() + 
children.get(i)));
+                }
+            }
+        } catch (NoNodeException e) {
+            return leafNodes;
+        } catch (Exception e) {
+            LOGGER.error("getLeafNodes path {} error", path, e);
+        }
+        return leafNodes;
+    }
+
+    @Override
+    public String getSplitter() {
+        return SPLITTER;
+    }
+
+    @Override
+    public String getUniqueKey() {
+        return uniqueKey;
+    }
+
+    /**
+     * replace keywords, file name /data/log/test.log will be trans to 
#data#log#test.log
+     * to prevent the path depth of zk to be too large
+     * @return string after replace
+     */
+    @Override
+    public String replaceKeywords(String source) {
+        return source.replace("/", "#");
+    }
+
+    @Override
+    public void close() throws IOException {
+        getClient().close();
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 740d2364aa..bcb04342ea 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.RocksDBStoreImpl;
 import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.store.TaskStore;
 
@@ -31,7 +31,7 @@ public class RocksDBUtils {
 
     public static void main(String[] args) {
         AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-        Store store = new RocksStoreImp(
+        Store store = new RocksDBStoreImpl(
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
         upgrade(store);
     }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 70ab3170f9..2779dc6b3e 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -67,10 +67,10 @@ public class TestLogFileSource {
     @BeforeClass
     public static void setup() {
         helper = new 
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
-        taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
-        instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+        taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+        instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
         offsetBasicStore =
-                
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+                TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
         OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
new file mode 100644
index 0000000000..ba7ed6b0e1
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.store;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.TaskStore;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+
+public class TestStoreKey {
+
+    private static TaskStore taskStore;
+    private static InstanceStore instanceStore;
+    private static OffsetStore offsetStore;
+
+    private static AgentBaseTestsHelper helper;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        AgentConfiguration.getAgentConf().set(AGENT_LOCAL_IP, "127.0.0.1");
+        helper = new 
AgentBaseTestsHelper(TestStoreKey.class.getName()).setupAgentHome();
+        taskStore = new TaskStore(new 
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_TASK));
+        instanceStore = new InstanceStore(new 
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_INSTANCE));
+        offsetStore = new OffsetStore(new 
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_OFFSET));
+
+    }
+
+    @AfterClass
+    public static void teardown() throws IOException {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testStore() {
+        Assert.assertEquals(0,
+                
taskStore.getKey().compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/task/task"));
+        Assert.assertEquals(0, taskStore.getKeyByTaskId("1")
+                
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/task/task/1"));
+        Assert.assertEquals(0, instanceStore.getKey()
+                
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/"));
+        Assert.assertEquals(0, instanceStore.getKeyByTaskId("1")
+                
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/1"));
+        Assert.assertEquals(0, instanceStore.getKeyByTaskAndInstanceId("1", 
"/data/log/123.log")
+                .compareTo(
+                        
"/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/1/#data#log#123.log"));
+        Assert.assertEquals(0, offsetStore.getKey("1", "/data/log/123.log")
+                .compareTo(
+                        
"/agent/default_tag/default_agent/127.0.0.1/.localdb/offset/offset/1/#data#log#123.log"));
+    }
+}

Reply via email to