This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6da93a43d1 [INLONG-10410][Agent] Add ZK plugin to save offset info
(#10411)
6da93a43d1 is described below
commit 6da93a43d1ddbc8104ce107456b1902b3031b96b
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Jun 14 09:53:39 2024 +0800
[INLONG-10410][Agent] Add ZK plugin to save offset info (#10411)
* [INLONG-10410][Agent] Add ZK plugin to save offset info
* [INLONG-10410][Agent] Add ZK plugin to save offset info
* Update
inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
Co-authored-by: AloysZhang <[email protected]>
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZKImp.java
Co-authored-by: Charles Zhang <[email protected]>
* [INLONG-10410][Agent] Add ZK plugin to save offset info
* [INLONG-10410][Agent] Add ZK plugin to save offset info
---------
Co-authored-by: AloysZhang <[email protected]>
Co-authored-by: Charles Zhang <[email protected]>
---
.../inlong/agent/constant/AgentConstants.java | 9 +-
.../inlong/agent/constant/CommonConstants.java | 6 +-
.../{RocksStoreImp.java => RocksDBStoreImpl.java} | 8 +-
...ocksStoreImp.java => TestRocksDBStoreImpl.java} | 8 +-
.../org/apache/inlong/agent/core/AgentManager.java | 12 +-
.../apache/inlong/agent/core/task/TaskManager.java | 17 +-
.../agent/core/instance/TestInstanceManager.java | 6 +-
.../inlong/agent/core/store/TestRocksDbKey.java | 69 ++++++++
.../inlong/agent/plugin/store/ZooKeeperImpl.java | 185 +++++++++++++++++++++
.../inlong/agent/plugin/utils/RocksDBUtils.java | 4 +-
.../agent/plugin/sources/TestLogFileSource.java | 6 +-
.../inlong/agent/plugin/store/TestStoreKey.java | 76 +++++++++
12 files changed, 368 insertions(+), 38 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 0d67e55f82..36d7272ddc 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -31,9 +31,9 @@ public class AgentConstants {
public static final String DEFAULT_AGENT_HOME =
System.getProperty("agent.home");
public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
- public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task";
- public static final String AGENT_LOCAL_DB_PATH_INSTANCE =
".localdb/instance";
- public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset";
+ public static final String AGENT_STORE_PATH_TASK = ".localdb/task";
+ public static final String AGENT_STORE_PATH_INSTANCE = ".localdb/instance";
+ public static final String AGENT_STORE_PATH_OFFSET = ".localdb/offset";
public static final String AGENT_UNIQ_ID = "agent.uniq.id";
// default is empty.
public static final String AGENT_FETCHER_CLASSNAME =
"agent.fetcher.classname";
@@ -46,6 +46,9 @@ public class AgentConstants {
public static final String AGENT_LOCAL_IP = "agent.local.ip";
public static final String DEFAULT_LOCAL_IP = "127.0.0.1";
public static final String DEFAULT_LOCAL_HOST = "localhost";
+ public static final String AGENT_STORE_CLASSNAME = "agent.store.classname";
+ public static final String DEFAULT_AGENT_STORE_CLASSNAME =
"org.apache.inlong.agent.store.RocksDBStoreImpl";
+
// default use local ip as uniq id for agent.
public static final String DEFAULT_AGENT_UNIQ_ID = AgentUtils.getLocalIp();
public static final String CUSTOM_FIXED_IP = "agent.custom.fixed.ip";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index a92b3222ad..45320406ef 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -89,9 +89,9 @@ public class CommonConstants {
public static final String PROXY_KEY_DATA = "dataKey";
public static final int DEFAULT_FILE_MAX_NUM = 4096;
- public static final String TASK_ID_PREFIX = "task_";
- public static final String INSTANCE_ID_PREFIX = "ins_";
- public static final String OFFSET_ID_PREFIX = "offset_";
+ public static final String TASK_ID_PREFIX = "task";
+ public static final String INSTANCE_ID_PREFIX = "ins";
+ public static final String OFFSET_ID_PREFIX = "offset";
public static final String AGENT_OS_NAME = "os.name";
public static final String AGENT_NIX_OS = "nix";
public static final String AGENT_NUX_OS = "nux";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
similarity index 97%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
index 47caee3703..b9be413f04 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksDBStoreImpl.java
@@ -47,9 +47,9 @@ import static java.util.Objects.requireNonNull;
/**
* Store implement based on the Rocks DB.
*/
-public class RocksStoreImp implements Store {
+public class RocksDBStoreImpl implements Store {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RocksStoreImp.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RocksDBStoreImpl.class);
private static final Gson GSON = new Gson();
public static final String SPLITTER = "_";
public static final String UNIQUE_KEY = "";
@@ -62,7 +62,7 @@ public class RocksStoreImp implements Store {
private ConcurrentHashMap<String, ColumnFamilyDescriptor>
columnDescriptorMap;
private String storePath;
- public RocksStoreImp(String childPath) {
+ public RocksDBStoreImpl(String childPath) {
// init rocks db
this.conf = AgentConfiguration.getAgentConf();
this.db = initEnv(childPath);
@@ -126,7 +126,7 @@ public class RocksStoreImp implements Store {
} else {
LOGGER.info("loading column families :" +
existing.stream().map(String::new).collect(Collectors.toList()));
managedColumnFamilies.addAll(
-
existing.stream().map(RocksStoreImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+
existing.stream().map(RocksDBStoreImpl::getColumnFamilyDescriptor).collect(Collectors.toList()));
}
return managedColumnFamilies;
}
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
similarity index 91%
rename from
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
rename to
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
index e9dd639b0a..5d1c291815 100644
---
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksDBStoreImpl.java
@@ -26,15 +26,15 @@ import org.junit.Test;
import java.io.IOException;
-public class TestRocksStoreImp {
+public class TestRocksDBStoreImpl {
- private static RocksStoreImp store;
+ private static RocksDBStoreImpl store;
private static AgentBaseTestsHelper helper;
@BeforeClass
public static void setup() throws Exception {
- helper = new
AgentBaseTestsHelper(TestRocksStoreImp.class.getName()).setupAgentHome();
- store = new RocksStoreImp("/localdb");
+ helper = new
AgentBaseTestsHelper(TestRocksDBStoreImpl.class.getName()).setupAgentHome();
+ store = new RocksDBStoreImpl("/localdb");
}
@AfterClass
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 6e8deba0ae..b00eebf869 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -29,10 +29,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.reflect.Constructor;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* Agent Manager, the bridge for task manager, task store e.t.c it manages
agent level operations and communicates
@@ -46,8 +44,7 @@ public class AgentManager extends AbstractDaemon {
private final ProfileFetcher fetcher;
private final AgentConfiguration conf;
private final ExecutorService agentConfMonitor;
- public static final int CONFIG_QUEUE_CAPACITY = 2;
- private static BlockingQueue<AgentConfigInfo> configQueue = new
LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
+ private static AgentConfigInfo agentConfigInfo;
public AgentManager() {
conf = AgentConfiguration.getAgentConf();
@@ -58,17 +55,14 @@ public class AgentManager extends AbstractDaemon {
}
public static AgentConfigInfo getAgentConfigInfo() {
- return configQueue.peek();
+ return agentConfigInfo;
}
public void subNewAgentConfigInfo(AgentConfigInfo info) {
if (info == null) {
return;
}
- if (configQueue.size() == CONFIG_QUEUE_CAPACITY) {
- configQueue.poll();
- }
- configQueue.add(info);
+ agentConfigInfo = info;
}
/**
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 5bbdf43432..c1b7da07f3 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -24,7 +24,6 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
-import org.apache.inlong.agent.store.RocksStoreImp;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -34,6 +33,7 @@ import org.apache.inlong.common.enums.TaskStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -77,7 +77,7 @@ public class TaskManager extends AbstractDaemon {
// tasks which are not accepted by running pool.
private final BlockingQueue<Task> pendingTasks;
private final int taskMaxLimit;
- private final AgentConfiguration agentConf;
+ private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
// instance profile queue.
private final BlockingQueue<TaskAction> actionQueue;
@@ -124,14 +124,13 @@ public class TaskManager extends AbstractDaemon {
* Init task manager.
*/
public TaskManager() {
- this.agentConf = AgentConfiguration.getAgentConf();
taskBasicStore = initStore(
- agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
+ agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_STORE_PATH_TASK));
taskStore = new TaskStore(taskBasicStore);
instanceBasicStore = initStore(
- agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
+ agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_STORE_PATH_INSTANCE));
offsetBasicStore =
- initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+ initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_STORE_PATH_OFFSET));
OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
this.runningPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
@@ -160,7 +159,11 @@ public class TaskManager extends AbstractDaemon {
*/
public static Store initStore(String childPath) {
try {
- return new RocksStoreImp(childPath);
+ Constructor<?> constructor =
+
Class.forName(agentConf.get(AgentConstants.AGENT_STORE_CLASSNAME,
+
AgentConstants.DEFAULT_AGENT_STORE_CLASSNAME)).getDeclaredConstructor(String.class);
+ constructor.setAccessible(true);
+ return (Store) constructor.newInstance(childPath);
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index f45e4c55e0..13c52bd505 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -54,12 +54,12 @@ public class TestInstanceManager {
public static void setup() {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
- Store basicStore = TaskManager.initStore("/localdb");
+ Store basicInstanceStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, "GMT+6:00");
- Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+ Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
- manager = new InstanceManager("1", 20, basicStore, taskStore);
+ manager = new InstanceManager("1", 20, basicInstanceStore, taskStore);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
new file mode 100644
index 0000000000..b854296872
--- /dev/null
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/store/TestRocksDbKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.store;
+
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.AgentBaseTestsHelper;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.TaskStore;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TestRocksDbKey {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestRocksDbKey.class);
+ private static TaskStore taskStore;
+ private static InstanceStore instanceStore;
+ private static OffsetStore offsetStore;
+
+ private static AgentBaseTestsHelper helper;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ helper = new
AgentBaseTestsHelper(TestRocksDbKey.class.getName()).setupAgentHome();
+ taskStore = new
TaskStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK));
+ instanceStore = new
InstanceStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE));
+ offsetStore = new
OffsetStore(TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET));
+
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testStore() {
+ Assert.assertEquals(0, taskStore.getKey().compareTo("task"));
+ Assert.assertEquals(0,
taskStore.getKeyByTaskId("1").compareTo("task_1"));
+ Assert.assertEquals(0, instanceStore.getKey().compareTo("ins_"));
+ Assert.assertEquals(0,
instanceStore.getKeyByTaskId("1").compareTo("ins_1"));
+ Assert.assertEquals(0, instanceStore.getKeyByTaskAndInstanceId("1",
"/data/log/123.log")
+ .compareTo("ins_1_/data/log/123.log"));
+ Assert.assertEquals(0, offsetStore.getKey("1",
"/data/log/123.log").compareTo("offset_1_/data/log/123.log"));
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
new file mode 100755
index 0000000000..b0df128e5b
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/store/ZooKeeperImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.store;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.core.AgentManager;
+import org.apache.inlong.agent.store.KeyValueEntity;
+import org.apache.inlong.agent.store.Store;
+
+import com.google.gson.Gson;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+
+/**
+ * Store implement based on the ZooKeeper.
+ */
+public class ZooKeeperImpl implements Store {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ZooKeeperImpl.class);
+ private static final Gson GSON = new Gson();
+ public static final int SLEEP_MS_BETWEEN_RETRIES = 1000 * 10;
+ public static final int MAX_RETRIES = 10;
+ public static final int SESSION_TIMEOUT_MS = 1000 * 60 * 5;
+ public static final int CONNECTION_TIMEOUT_MS = 1000 * 10;
+ public static final String SPLITTER = "/";
+ private static CuratorFramework client;
+ private String uniqueKey;
+ private static final AgentConfiguration conf =
AgentConfiguration.getAgentConf();
+ private static final String ZK_PRE = "/agent";
+
+ public ZooKeeperImpl(String childPath) {
+ uniqueKey = ZK_PRE + getSplitter() + conf.get(AGENT_CLUSTER_TAG) +
getSplitter() + conf.get(AGENT_CLUSTER_NAME)
+ + getSplitter() + conf.get(AGENT_LOCAL_IP) + getSplitter() +
childPath;
+ }
+
+ /**
+ * get client
+ *
+ * @return zookeeper client
+ */
+ private static CuratorFramework getClient() {
+ if (client == null) {
+ synchronized (ZooKeeperImpl.class) {
+ if (client == null) {
+ if (AgentManager.getAgentConfigInfo() == null) {
+ throw new RuntimeException("agent config is null");
+ }
+ RetryPolicy retryPolicy = new
RetryNTimes(SLEEP_MS_BETWEEN_RETRIES, MAX_RETRIES);
+ client =
CuratorFrameworkFactory.newClient(AgentManager.getAgentConfigInfo().getZkUrl(),
+ SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS,
retryPolicy);
+ client.start();
+ }
+ }
+ }
+ return client;
+ }
+
+ @Override
+ public KeyValueEntity get(String key) {
+ try {
+ byte[] bytes = getClient().getData().forPath(key);
+ return bytes == null ? null : GSON.fromJson(new String(bytes),
KeyValueEntity.class);
+ } catch (NoNodeException e) {
+ return null;
+ } catch (Exception e) {
+ throw new RuntimeException("get key value entity error", e);
+ }
+ }
+
+ @Override
+ public void put(KeyValueEntity entity) {
+ Stat stat;
+ try {
+ byte[] data = GSON.toJson(entity).getBytes(StandardCharsets.UTF_8);
+ stat = getClient().checkExists().forPath(entity.getKey());
+ if (stat == null) {
+ getClient().create().creatingParentsIfNeeded()
+ .forPath(entity.getKey(), data);
+ } else {
+ getClient().setData().forPath(entity.getKey(), data);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Path {}, put has exception",
+ entity.getKey(), e);
+ }
+ }
+
+ @Override
+ public KeyValueEntity remove(String key) {
+ Stat stat;
+ try {
+ stat = getClient().checkExists().forPath(key);
+ if (stat != null) {
+ getClient().delete().forPath(key);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Path {}, remove has exception", key, e);
+ }
+ return null;
+ }
+
+ @Override
+ public List<KeyValueEntity> findAll(String prefix) {
+ List<KeyValueEntity> result = new ArrayList<>();
+ List<String> nodes = getLeafNodes(prefix);
+ for (int i = 0; i < nodes.size(); i++) {
+ result.add(get(nodes.get(i)));
+ }
+ return result;
+ }
+
+ private List<String> getLeafNodes(String path) {
+ List<String> leafNodes = new ArrayList<>();
+ try {
+ List<String> children = getClient().getChildren().forPath(path);
+ if (children.isEmpty()) {
+ leafNodes.add(path);
+ } else {
+ for (int i = 0; i < children.size(); i++) {
+ leafNodes.addAll(getLeafNodes(path + getSplitter() +
children.get(i)));
+ }
+ }
+ } catch (NoNodeException e) {
+ return leafNodes;
+ } catch (Exception e) {
+ LOGGER.error("getLeafNodes path {} error", path, e);
+ }
+ return leafNodes;
+ }
+
+ @Override
+ public String getSplitter() {
+ return SPLITTER;
+ }
+
+ @Override
+ public String getUniqueKey() {
+ return uniqueKey;
+ }
+
+ /**
+ * replace keywords, file name /data/log/test.log will be trans to
#data#log#test.log
+ * to prevent the path depth of zk to be too large
+ * @return string after replace
+ */
+ @Override
+ public String replaceKeywords(String source) {
+ return source.replace("/", "#");
+ }
+
+ @Override
+ public void close() throws IOException {
+ getClient().close();
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 740d2364aa..bcb04342ea 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -21,7 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.RocksDBStoreImpl;
import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.store.TaskStore;
@@ -31,7 +31,7 @@ public class RocksDBUtils {
public static void main(String[] args) {
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
- Store store = new RocksStoreImp(
+ Store store = new RocksDBStoreImpl(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
upgrade(store);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 70ab3170f9..2779dc6b3e 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -67,10 +67,10 @@ public class TestLogFileSource {
@BeforeClass
public static void setup() {
helper = new
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
- taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
- instanceBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+ taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+ instanceBasicStore =
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
offsetBasicStore =
-
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+ TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
new file mode 100644
index 0000000000..ba7ed6b0e1
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/store/TestStoreKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.store;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.TaskStore;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+
+public class TestStoreKey {
+
+ private static TaskStore taskStore;
+ private static InstanceStore instanceStore;
+ private static OffsetStore offsetStore;
+
+ private static AgentBaseTestsHelper helper;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ AgentConfiguration.getAgentConf().set(AGENT_LOCAL_IP, "127.0.0.1");
+ helper = new
AgentBaseTestsHelper(TestStoreKey.class.getName()).setupAgentHome();
+ taskStore = new TaskStore(new
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_TASK));
+ instanceStore = new InstanceStore(new
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_INSTANCE));
+ offsetStore = new OffsetStore(new
ZooKeeperImpl(AgentConstants.AGENT_STORE_PATH_OFFSET));
+
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testStore() {
+ Assert.assertEquals(0,
+
taskStore.getKey().compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/task/task"));
+ Assert.assertEquals(0, taskStore.getKeyByTaskId("1")
+
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/task/task/1"));
+ Assert.assertEquals(0, instanceStore.getKey()
+
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/"));
+ Assert.assertEquals(0, instanceStore.getKeyByTaskId("1")
+
.compareTo("/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/1"));
+ Assert.assertEquals(0, instanceStore.getKeyByTaskAndInstanceId("1",
"/data/log/123.log")
+ .compareTo(
+
"/agent/default_tag/default_agent/127.0.0.1/.localdb/instance/ins/1/#data#log#123.log"));
+ Assert.assertEquals(0, offsetStore.getKey("1", "/data/log/123.log")
+ .compareTo(
+
"/agent/default_tag/default_agent/127.0.0.1/.localdb/offset/offset/1/#data#log#123.log"));
+ }
+}