This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a194f575f9 [INLONG-9117][Agent] Rewrite class RocksDbImp to enable it
to be constructed with a child path (#9119)
a194f575f9 is described below
commit a194f575f9cf72a005634e5f20144668013e9e09
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Oct 26 16:58:02 2023 +0800
[INLONG-9117][Agent] Rewrite class RocksDbImp to enable it to be
constructed with a child path (#9119)
---
.../org/apache/inlong/agent/db/KeyValueEntity.java | 8 ++++
.../org/apache/inlong/agent/db/RocksDbImp.java | 46 ++++++----------------
.../org/apache/inlong/agent/db/TestRocksDbImp.java | 37 +----------------
.../org/apache/inlong/agent/core/AgentManager.java | 8 ++--
.../inlong/agent/plugin/utils/RocksDBUtils.java | 6 ++-
5 files changed, 29 insertions(+), 76 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index 5cbda1e4f0..5ca4dadcb6 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.db;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
/**
@@ -90,6 +91,13 @@ public class KeyValueEntity {
return TriggerProfile.parseJsonStr(getJsonValue());
}
+ /**
+ * convert keyValue to offset profile
+ */
+ public OffsetProfile getAsOffsetProfile() {
+ return OffsetProfile.parseJsonStr(getJsonValue());
+ }
+
/**
* check whether the entity is finished
*/
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index d8dd2aa522..eac8479aa4 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -62,10 +62,10 @@ public class RocksDbImp implements Db {
private ConcurrentHashMap<String, ColumnFamilyDescriptor>
columnDescriptorMap;
private String storePath;
- public RocksDbImp() {
+ public RocksDbImp(String childPath) {
// init rocks db
this.conf = AgentConfiguration.getAgentConf();
- this.db = initEnv();
+ this.db = initEnv(childPath);
// add a command column family
addColumnFamily(commandFamilyName);
}
@@ -74,10 +74,10 @@ public class RocksDbImp implements Db {
return new ColumnFamilyDescriptor(columnFamilyName, new
ColumnFamilyOptions());
}
- private RocksDB initEnv() {
- String configPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
+ private RocksDB initEnv(String childPath) {
String parentPath = conf.get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME);
- File finalPath = new File(parentPath, configPath);
+ LOGGER.info("parentPath {} childPath {}", parentPath, childPath);
+ File finalPath = new File(parentPath, childPath);
storePath = finalPath.getAbsolutePath();
RocksDB.loadLibrary();
final Options options = new Options();
@@ -160,23 +160,12 @@ public class RocksDbImp implements Db {
@Override
public CommandEntity getCommand(String commandId) {
- try {
- byte[] bytes = db.get(columnHandlesMap.get(commandFamilyName),
commandId.getBytes());
- return bytes == null ? null : GSON.fromJson(new String(bytes),
CommandEntity.class);
- } catch (Exception e) {
- throw new RuntimeException("get command value error", e);
- }
+ return null;
}
@Override
public CommandEntity putCommand(CommandEntity entity) {
- requireNonNull(entity);
- try {
- db.put(columnHandlesMap.get(commandFamilyName),
entity.getId().getBytes(), GSON.toJson(entity).getBytes());
- } catch (Exception e) {
- throw new RuntimeException("put value to rocks db error", e);
- }
- return entity;
+ return null;
}
@Override
@@ -228,6 +217,11 @@ public class RocksDbImp implements Db {
return results;
}
+ @Override
+ public List<CommandEntity> searchCommands(boolean isAcked) {
+ return null;
+ }
+
@Override
public List<KeyValueEntity> search(StateSearchKey searchKey) {
List<KeyValueEntity> results = new LinkedList<>();
@@ -260,22 +254,6 @@ public class RocksDbImp implements Db {
return results;
}
- @Override
- public List<CommandEntity> searchCommands(boolean isAcked) {
- List<CommandEntity> results = new LinkedList<>();
- try (final RocksIterator it =
db.newIterator(columnHandlesMap.get(commandFamilyName))) {
- it.seekToFirst();
- while (it.isValid()) {
- CommandEntity commandEntity = GSON.fromJson(new
String(it.value()), CommandEntity.class);
- if (commandEntity.isAcked() == isAcked) {
- results.add(commandEntity);
- }
- it.next();
- }
- }
- return results;
- }
-
@Override
public KeyValueEntity searchOne(StateSearchKey searchKey) {
try (final RocksIterator it =
db.newIterator(columnHandlesMap.get(defaultFamilyName))) {
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
index 3a41ef8644..eccb9c298e 100644
---
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -18,9 +18,6 @@
package org.apache.inlong.agent.db;
import org.apache.inlong.agent.AgentBaseTestsHelper;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.db.CommandEntity;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -30,10 +27,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
public class TestRocksDbImp {
private static RocksDbImp db;
@@ -42,7 +35,7 @@ public class TestRocksDbImp {
@BeforeClass
public static void setup() throws Exception {
helper = new
AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
- db = new RocksDbImp();
+ db = new RocksDbImp("/localdb");
}
@AfterClass
@@ -81,22 +74,6 @@ public class TestRocksDbImp {
db.put(entity);
KeyValueEntity newEntity = db.get("test1");
Assert.assertEquals("testC", newEntity.getJsonValue());
-
- }
-
- @Test
- public void testCommandDb() {
- CommandEntity commandEntity = new CommandEntity();
- commandEntity.setId("1");
- commandEntity.setCommandResult(0);
- commandEntity.setAcked(false);
- commandEntity.setTaskId(1);
- commandEntity.setVersion(1);
- db.putCommand(commandEntity);
- CommandEntity command = db.getCommand("1");
- Assert.assertEquals("1", command.getId());
- List<CommandEntity> commandEntities = db.searchCommands(false);
- Assert.assertEquals("1", commandEntities.get(0).getId());
}
@Test
@@ -115,16 +92,4 @@ public class TestRocksDbImp {
KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
Assert.assertEquals("searchKey1", entityResult.getKey());
}
-
- @Test
- public void testBinlogJobStore() {
- JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json");
- JobProfileDb jobDb = new JobProfileDb(db);
- String jobId = jobProfile.get(JOB_ID);
- jobProfile.set(JOB_INSTANCE_ID,
AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
- jobDb.storeJobFirstTime(jobProfile);
- List<JobProfile> restarts = jobDb.getRestartJobs();
- Assert.assertEquals(1, restarts.size());
- }
-
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index c3e2344b27..315667d86a 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -29,6 +29,7 @@ import org.apache.inlong.agent.core.trigger.TriggerManager;
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.RocksDbImp;
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.slf4j.Logger;
@@ -102,11 +103,8 @@ public class AgentManager extends AbstractDaemon {
*/
private Db initDb() {
try {
- // db is a required component, so if not init correctly,
- // throw exception and stop running.
- return (Db) Class.forName(conf.get(
- AgentConstants.AGENT_DB_CLASSNAME,
AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
- .newInstance();
+ String childPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
+ return new RocksDbImp(childPath);
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 469acf387e..a16914dca3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -17,7 +17,9 @@
package org.apache.inlong.agent.plugin.utils;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
@@ -31,7 +33,9 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
public class RocksDBUtils {
public static void main(String[] args) {
- Db db = new RocksDbImp();
+ AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+ Db db = new RocksDbImp(
+ agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
upgrade(db);
}