This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-455 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit aec7c7629e3842de38dd2923748ab08bd39c0006 Author: yuanbo <[email protected]> AuthorDate: Mon Dec 21 14:43:57 2020 +0800 [TUBEMQ-459] init agent project --- .travis.yml | 1 + tubemq-agent/agent-common/pom.xml | 90 ++++++ .../tubemq/agent/conf/AgentConfiguration.java | 112 +++++++ .../apache/tubemq/agent/conf/Configuration.java | 260 ++++++++++++++++ .../tubemq/agent/constants/AgentConstants.java | 130 ++++++++ .../org/apache/tubemq/agent/utils/AgentUtils.java | 131 ++++++++ tubemq-agent/agent-core/pom.xml | 66 ++++ .../java/org/apache/tubemq/agent/AgentMain.java | 90 ++++++ .../apache/tubemq/agent/core/TestAgentMain.java | 44 +++ tubemq-agent/pom.xml | 331 +++++++++++++++++++++ 10 files changed, 1255 insertions(+) diff --git a/.travis.yml b/.travis.yml index 9e4e758..0072542 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,6 +31,7 @@ cache: script: - mvn -B -V -e verify + - cd tubemq-agent && mvn clean package after_success: - bash <(curl -s https://codecov.io/bash) || echo 'Codecov failed to upload' diff --git a/tubemq-agent/agent-common/pom.xml b/tubemq-agent/agent-common/pom.xml new file mode 100644 index 0000000..2546b3e --- /dev/null +++ b/tubemq-agent/agent-common/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.tubemq</groupId> + <artifactId>tubemq-agent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>agent-common</artifactId> + <modelVersion>4.0.0</modelVersion> + + <dependencies> + <dependency> + <artifactId>commons-dbutils</artifactId> + <groupId>commons-dbutils</groupId> + </dependency> + <dependency> + <artifactId>je</artifactId> + <groupId>com.sleepycat</groupId> + </dependency> + <dependency> + <artifactId>commons-lang3</artifactId> + <groupId>org.apache.commons</groupId> + </dependency> + <dependency> + <artifactId>gson</artifactId> + <groupId>com.google.code.gson</groupId> + </dependency> + <dependency> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </dependency> + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + </dependency> + <dependency> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </dependency> + + <dependency> + <artifactId>commons-cli</artifactId> + <groupId>commons-cli</groupId> + </dependency> + + <dependency> + <artifactId>commons-io</artifactId> + <groupId>commons-io</groupId> + </dependency> + + <dependency> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/AgentConfiguration.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/AgentConfiguration.java new file mode 100644 index 0000000..6a99f5d --- /dev/null +++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/AgentConfiguration.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent.conf; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.io.FileUtils; +import org.apache.tubemq.agent.constants.AgentConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * agent configuration. Only one instance in the process. + * Basically it use properties file to store configurations. + */ +public class AgentConfiguration extends Configuration { + + private static final Logger LOGGER = LoggerFactory.getLogger(AgentConfiguration.class); + + private static final String DEFAULT_CONFIG_FILE = "agent.properties"; + private static final String TMP_CONFIG_FILE = ".tmp.agent.properties"; + + private static final ArrayList<String> LOCAL_RESOURCES = new ArrayList<>(); + + private static final ReadWriteLock LOCK = new ReentrantReadWriteLock(); + + static { + LOCAL_RESOURCES.add(DEFAULT_CONFIG_FILE); + } + + private static AgentConfiguration agentConf = null; + + /** + * load config from agent file. + */ + private AgentConfiguration() { + // 初始化配置文件 + for (String fileName : LOCAL_RESOURCES) { + super.loadPropertiesResource(fileName); + } + } + + /** + * singleton for agent configuration. + * @return - static instance of AgentConfiguration + */ + public static AgentConfiguration getAgentConf() { + if (agentConf == null) { + synchronized (AgentConfiguration.class) { + if (agentConf == null) { + agentConf = new AgentConfiguration(); + } + } + } + return agentConf; + } + + private String getNextBackupFileName() { + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); + String dateStr = format.format(new Date(System.currentTimeMillis())); + return DEFAULT_CONFIG_FILE + "." + dateStr; + } + + /** + * flush config to local files. + */ + public void flushToLocalPropertiesFile() { + LOCK.writeLock().lock(); + // TODO: flush to local file as properties file. + try { + String agentConfParent = get( + AgentConstants.AGENT_CONF_PARENT, AgentConstants.DEFAULT_AGENT_CONF_PARENT); + File sourceFile = new File(agentConfParent, DEFAULT_CONFIG_FILE); + File targetFile = new File(agentConfParent, getNextBackupFileName()); + File tmpFile = new File(agentConfParent, TMP_CONFIG_FILE); + if (sourceFile.exists()) { + FileUtils.copyFile(sourceFile, targetFile); + } + List<String> tmpCache = getStorageList(); + FileUtils.writeLines(tmpFile, tmpCache); + + FileUtils.copyFile(tmpFile, sourceFile); + tmpFile.delete(); + } catch (Exception ex) { + LOGGER.error("error while flush agent conf to local", ex); + } finally { + LOCK.writeLock().unlock(); + } + + } + + @Override + public boolean allRequiredKeyExist() { + return true; + } +} diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/Configuration.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/Configuration.java new file mode 100644 index 0000000..3a924b2 --- /dev/null +++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/Configuration.java @@ -0,0 +1,260 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent.conf; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.tubemq.agent.utils.AgentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class Configuration { + + private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class); + private static final JsonParser JSON_PARSER = new JsonParser(); + + private final Map<String, JsonPrimitive> configStorage = new HashMap<>(); + + // get config file by class loader + private ClassLoader classLoader; + + { + classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = AgentConfiguration.class.getClassLoader(); + } + } + + public abstract boolean allRequiredKeyExist(); + + /** + * support load config file from json/properties file. + * + * @param fileName - file name + * @param isJson - whether is json file + */ + private void loadResource(String fileName, boolean isJson) { + Reader reader = null; + try { + InputStream inputStream = classLoader.getResourceAsStream(fileName); + if (inputStream != null) { + reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + if (isJson) { + JsonElement tmpElement = JSON_PARSER.parse(reader).getAsJsonObject(); + updateConfig(new HashMap<>(), 0, tmpElement); + } else { + Properties properties = new Properties(); + properties.load(reader); + properties.forEach((key, value) -> configStorage.put((String) key, + new JsonPrimitive((String) value))); + } + } + } catch (Exception ioe) { + LOGGER.error("error init {}", fileName, ioe); + } finally { + AgentUtils.finallyClose(reader); + } + } + + /** + * load config from json string. + * + * @param jsonStr - json string + */ + public void loadJsonStrResource(String jsonStr) { + JsonElement tmpElement = JSON_PARSER.parse(jsonStr); + updateConfig(new HashMap<>(), 0, tmpElement); + } + + /** + * load config file from CLASS_PATH. config file is json file. + * + * @param fileName - file name + */ + void loadJsonResource(String fileName) { + loadResource(fileName, true); + } + + void loadPropertiesResource(String fileName) { + loadResource(fileName, false); + } + + /** + * Convert json string to map + * + * @param keyDeptPath - map + * @param dept - json dept + * @param tmpElement - json element + */ + void updateConfig(HashMap<Integer, String> keyDeptPath, int dept, JsonElement tmpElement) { + if (tmpElement instanceof JsonObject) { + JsonObject tmpJsonObject = tmpElement.getAsJsonObject(); + for (String key : tmpJsonObject.keySet()) { + keyDeptPath.put(dept, key); + updateConfig(keyDeptPath, dept + 1, tmpJsonObject.get(key)); + } + } else if (tmpElement instanceof JsonArray) { + JsonArray tmpJsonArray = tmpElement.getAsJsonArray(); + String lastKey = keyDeptPath.getOrDefault(dept - 1, ""); + for (int index = 0; index < tmpJsonArray.size(); index++) { + keyDeptPath.put(dept - 1, lastKey + "[" + index + "]"); + updateConfig(keyDeptPath, dept, tmpJsonArray.get(index)); + } + } else if (tmpElement instanceof JsonPrimitive) { + List<String> builder = new ArrayList<>(); + for (int index = 0; index < dept; index++) { + builder.add(keyDeptPath.getOrDefault(index, "")); + } + String keyChain = StringUtils.join(builder, "."); + if (!StringUtils.isBlank(keyChain)) { + configStorage.put(keyChain, tmpElement.getAsJsonPrimitive()); + } + } + } + + /** + * get int from config + * + * @param key - key + * @param defaultValue - default value + * @return value + */ + public int getInt(String key, int defaultValue) { + JsonElement value = configStorage.get(key); + return value == null ? defaultValue : value.getAsInt(); + } + + /** + * get int from config + * + * @param key - key + * @return value + * @throws NullPointerException npe + */ + public int getInt(String key) { + JsonElement value = configStorage.get(key); + if (value == null) { + throw new NullPointerException("null value for key " + key); + } + return value.getAsInt(); + } + + /** + * get long + * + * @param key - key + * @param defaultValue - default value + * @return long + */ + public long getLong(String key, long defaultValue) { + JsonElement value = configStorage.get(key); + return value == null ? defaultValue : value.getAsLong(); + } + + /** + * get boolean + * + * @param key - key + * @param defaultValue - default value + * @return 返回boolean + */ + public boolean getBoolean(String key, boolean defaultValue) { + JsonElement value = configStorage.get(key); + return value == null ? defaultValue : value.getAsBoolean(); + } + + /** + * get string + * + * @param key - key + * @param defaultValue - default value + * @return string + */ + public String get(String key, String defaultValue) { + JsonElement value = configStorage.get(key); + return value == null ? defaultValue : value.getAsString(); + } + + /** + * get string or throw npe + * + * @param key - key + * @return string + * @throws NullPointerException if value is null, throw npe + */ + public String get(String key) { + JsonElement value = configStorage.get(key); + if (value == null) { + throw new NullPointerException("null value for key " + key); + } + return value.getAsString(); + } + + /** + * whether key exists + * + * @param key - key + * @return - true if key exists else not + */ + public boolean hasKey(String key) { + return configStorage.containsKey(key); + } + + /** + * set key/value + * + * @param key - key + * @param value - value + */ + public void set(String key, String value) { + configStorage.put(key, new JsonPrimitive(value)); + } + + public void setInt(String key, int value) { + configStorage.put(key, new JsonPrimitive(value)); + } + + public void setLong(String key, long value) { + configStorage.put(key, new JsonPrimitive(value)); + } + + public void setBoolean(String key, boolean value) { + configStorage.put(key, new JsonPrimitive(value)); + } + + Map<String, JsonPrimitive> getConfigStorage() { + return configStorage; + } + + List<String> getStorageList() { + List<String> result = new ArrayList<>(); + for (Map.Entry<String, JsonPrimitive> entry : configStorage.entrySet()) { + result.add(entry.getKey() + "=" + entry.getValue().getAsString()); + } + return result; + } +} diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java new file mode 100644 index 0000000..40c48e3 --- /dev/null +++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/AgentConstants.java @@ -0,0 +1,130 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent.constants; + +public class AgentConstants { + + public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path"; + public static final String DEFAULT_AGENT_LOCAL_STORE_PATH = ".bdb"; + + public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path"; + public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb"; + + public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name"; + public static final String DEFAULT_AGENT_DB_INSTANCE_NAME = "agent"; + + public static final String AGENT_DB_CLASSNAME = "agent.db.classname"; + public static final String DEFAULT_AGENT_DB_CLASSNAME = "org.apache.tubemq.agent.db.BerkeleyDBImp"; + + // default is empty. + public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname"; + + public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval"; + public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60; + + public static final String AGENT_CONF_PARENT = "agent.conf.parent"; + public static final String DEFAULT_AGENT_CONF_PARENT = "conf"; + + public static final String AGENT_LOCAL_STORE_READONLY = "agent.localStore.readonly"; + public static final boolean DEFAULT_AGENT_LOCAL_STORE_READONLY = false; + + public static final String AGENT_HTTP_PORT = "agent.http.port"; + public static final int DEFAULT_AGENT_HTTP_PORT = 8008; + + public static final String AGENT_ENABLE_HTTP = "agent.http.enable"; + public static final boolean DEFAULT_AGENT_ENABLE_HTTP = false; + + public static final String TRIGGER_FETCH_INTERVAL = "trigger.fetch.interval"; + public static final int DEFAULT_TRIGGER_FETCH_INTERVAL = 1; + + public static final String AGENT_LOCAL_STORE_TRANSACTIONAL = "agent.localStore.transactional"; + public static final boolean DEFAULT_AGENT_LOCAL_STORE_TRANSACTIONAL = true; + + public static final String AGENT_LOCAL_STORE_LOCK_TIMEOUT = "agent.localStore.lockTimeout"; + public static final int DEFAULT_AGENT_LOCAL_STORE_LOCK_TIMEOUT = 10000; + + public static final String AGENT_LOCAL_STORE_NO_SYNC_VOID = "agent.localStore.noSyncVoid"; + public static final boolean DEFAULT_AGENT_LOCAL_STORE_NO_SYNC_VOID = false; + + public static final String AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID = + "agent.localStore.WriteNoSyncVoid"; + public static final boolean DEFAULT_AGENT_LOCAL_STORE_WRITE_NO_SYNC_VOID = false; + + public static final String AGENT_FETCH_CENTER_INTERVAL_SECONDS = "agent.fetchCenter.interval"; + public static final int DEFAULT_AGENT_FETCH_CENTER_INTERVAL_SECONDS = 5; + + public static final String AGENT_TRIGGER_CHECK_INTERVAL_SECONDS = "agent.trigger.check.interval"; + public static final int DEFAULT_AGENT_TRIGGER_CHECK_INTERVAL_SECONDS = 1; + + public static final String THREAD_POOL_AWAIT_TIME = "thread.pool.await.time"; + public static final long DEFAULT_THREAD_POOL_AWAIT_TIME = 30; + + public static final String JOB_PENDING_MAX = "job.pending.max"; + public static final int DEFAULT_JOB_PENDING_MAX = 10; + + public static final String JOB_RUNNING_THREAD_CORE_SIZE = "job.running.thread.coreSize"; + public static final int DEFAULT_JOB_RUNNING_THREAD_CORE_SIZE = 1; + + public static final String JOB_MONITOR_INTERVAL = "job.monitor.interval"; + public static final int DEFAULT_JOB_MONITOR_INTERVAL = 5; + + public static final String JOB_RUNNING_THREAD_MAX_SIZE = "job.running.thread.maxSize"; + public static final int DEFAULT_JOB_RUNNING_THREAD_MAX_SIZE = 5; + + public static final String JOB_RUNNING_THREAD_KEEP_ALIVE = "job.running.thread.keepAlive"; + public static final long DEFAULT_JOB_RUNNING_THREAD_KEEP_ALIVE = 60L; + + public static final String JOB_FINISH_CHECK_INTERVAL = "job.finish.checkInterval"; + public static final long DEFAULT_JOB_FINISH_CHECK_INTERVAL = 6L; + + public static final String TASK_PENDING_MAX = "task.pending.max"; + public static final int DEFAULT_TASK_PENDING_MAX = 100; + + public static final String TASK_RUNNING_THREAD_CORE_SIZE = "task.running.thread.coreSize"; + public static final int DEFAULT_TASK_RUNNING_THREAD_CORE_SIZE = 4; + + public static final String TASK_RUNNING_THREAD_MAX_SIZE = "task.running.thread.maxSize"; + public static final int DEFAULT_TASK_RUNNING_THREAD_MAX_SIZE = + Runtime.getRuntime().availableProcessors() * 2; + + public static final String TASK_RUNNING_THREAD_KEEP_ALIVE = "task.running.thread.keepAlive"; + public static final long DEFAULT_TASK_RUNNING_THREAD_KEEP_ALIVE = 60L; + + public static final String TASK_RETRY_MAX_CAPACITY = "task.retry.maxCapacity"; + public static final int DEFAULT_TASK_RETRY_MAX_CAPACITY = 10000; + + public static final String TASK_MONITOR_INTERVAL = "task.monitor.interval"; + public static final int DEFAULT_TASK_MONITOR_INTERVAL = 6; + + public static final String TASK_RETRY_SUBMIT_WAIT_SECONDS = "task.retry.submit.waitSeconds"; + public static final int DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS = 5; + + public static final String TASK_MAX_RETRY_TIME = "task.maxRetry.time"; + public static final int DEFAULT_TASK_MAX_RETRY_TIME = 3; + + public static final String TASK_PUSH_MAX_SECOND = "task.push.maxSecond"; + public static final int DEFAULT_TASK_PUSH_MAX_SECOND = 2; + + public static final String TASK_PULL_MAX_SECOND = "task.pull.maxSecond"; + public static final int DEFAULT_TASK_PULL_MAX_SECOND = 2; + + public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity"; + public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 5000; + + public static final String TRIGGER_CHECK_INTERVAL = "trigger.check.interval"; + public static final int DEFAULT_TRIGGER_CHECK_INTERVAL = 2; + + public static final String WORKER_POOL_AWAIT_TIME = "worker.pool.await.time"; + public static final long DEFAULT_WORKER_POOL_AWAIT_TIME = 10; +} diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/utils/AgentUtils.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/utils/AgentUtils.java new file mode 100644 index 0000000..366bff7 --- /dev/null +++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/utils/AgentUtils.java @@ -0,0 +1,131 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent.utils; + +import java.io.Closeable; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AgentUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(AgentUtils.class); + private static final AtomicLong index = new AtomicLong(0); + + /** + * finally close resources + * + * @param resource - resource which is closable. + */ + public static void finallyClose(Closeable resource) { + if (resource != null) { + try { + resource.close(); + } catch (Exception ex) { + LOGGER.info("error while closing", ex); + } + } + } + + /** + * finally close resources. + * + * @param resource - resource which is closable. + */ + public static void finallyClose(AutoCloseable resource) { + if (resource != null) { + try { + resource.close(); + } catch (Exception ex) { + LOGGER.error("error while closing", ex); + } + } + } + + /** + * Get declare fields. + */ + public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) { + List<Field> fields = new ArrayList<Field>(); + // check whether parent exists + while (clazz != null) { + fields.addAll(Arrays.asList(clazz.getDeclaredFields())); + clazz = clazz.getSuperclass(); + } + return fields; + } + + /** + * Get declare methods. + * + * @param clazz - class of field from method return + * @return list of methods + */ + public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) { + List<Method> methods = new ArrayList<Method>(); + while (clazz != null) { + methods.addAll(Arrays.asList(clazz.getDeclaredMethods())); + clazz = clazz.getSuperclass(); + } + return methods; + } + + /** + * get random int of [seed, seed * 2] + * @param seed + * @return + */ + public static int getRandomBySeed(int seed) { + return ThreadLocalRandom.current().nextInt(0, seed) + seed; + } + + public static String getLocalIP() { + String ip = "127.0.0.1"; + try (DatagramSocket socket = new DatagramSocket()) { + socket.connect(InetAddress.getByName("8.8.8.8"), 10002); + ip = socket.getLocalAddress().getHostAddress(); + } catch (Exception ex) { + LOGGER.error("error while get local ip", ex); + } + return ip; + } + + /** + * Get uniq id with timestamp. + * + * @return uniq id. + */ + public static String getUniqId(String id) { + // timestamp in seconds + long currentTime = System.currentTimeMillis() / 1000; + return "job_" + id + "_" + currentTime + "_" + index.getAndIncrement(); + } + + public static void silenceSleepInMs(long millisecond) { + try { + TimeUnit.MILLISECONDS.sleep(millisecond); + } catch (Exception ignored) { + + } + } +} diff --git a/tubemq-agent/agent-core/pom.xml b/tubemq-agent/agent-core/pom.xml new file mode 100644 index 0000000..a58d332 --- /dev/null +++ b/tubemq-agent/agent-core/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.tubemq</groupId> + <artifactId>tubemq-agent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>agent-core</artifactId> + <modelVersion>4.0.0</modelVersion> + + <dependencies> + <dependency> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + <scope>test</scope> + </dependency> + <dependency> + <artifactId>awaitility</artifactId> + <groupId>org.awaitility</groupId> + <scope>test</scope> + </dependency> + + <dependency> + <artifactId>agent-common</artifactId> + <groupId>org.apache.tubemq</groupId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.tubemq</groupId> + <artifactId>agent-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java new file mode 100644 index 0000000..2b46b0b --- /dev/null +++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent; + +import java.util.Iterator; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.tubemq.agent.conf.AgentConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Agent entrance class + */ +public class AgentMain { + private static final Logger LOGGER = LoggerFactory.getLogger(AgentMain.class); + + /** + * Print help information + * + * @param opts - options + */ + private static void help(Options opts) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("tdbank-agent", opts); + System.exit(0); + } + + /** + * Init options + * + * @param args - argument + * @return command line + */ + public static CommandLine initOptions(String[] args) { + CommandLineParser parser = new DefaultParser(); + Options options = new Options(); + options.addOption("h", "help", false, "show help"); + try { + return parser.parse(options, args); + } catch (Exception ex) { + help(options); + } + return null; + } + + /** + * Init agent conf + * + * @param cl - commandline + */ + public static void initAgentConf(CommandLine cl) { + AgentConfiguration conf = AgentConfiguration.getAgentConf(); + Iterator<Option> iterator = cl.iterator(); + while (iterator != null && iterator.hasNext()) { + Option opt = iterator.next(); + if (opt != null && opt.getLongOpt() != null + && opt.getValue() != null && conf.hasKey(opt.getLongOpt())) { + conf.set(opt.getLongOpt(), opt.getValue().trim()); + } + } + } + + /** + * Main entrance. + * + * @param args - arguments + * @throws Exception exceptions + */ + public static void main(String[] args) throws Exception { + CommandLine cl = initOptions(args); + assert cl != null; + initAgentConf(cl); + } +} diff --git a/tubemq-agent/agent-core/src/test/java/org/apache/tubemq/agent/core/TestAgentMain.java b/tubemq-agent/agent-core/src/test/java/org/apache/tubemq/agent/core/TestAgentMain.java new file mode 100644 index 0000000..0690573 --- /dev/null +++ b/tubemq-agent/agent-core/src/test/java/org/apache/tubemq/agent/core/TestAgentMain.java @@ -0,0 +1,44 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tubemq.agent.core; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.junit.Assert; +import org.junit.Test; + +public class TestAgentMain { + + @Test + public void testOpts() throws Exception { + Options options = new Options(); + options.addOption("h", "help", false, "show help"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("./run.sh", options); + } + + @Test + public void testOptsOverride() throws Exception { + Options options = new Options(); + options.addOption("t", "test", true, "test args"); + + CommandLineParser parser = new DefaultParser(); + CommandLine cl = parser.parse(options, new String[]{"-test all the"}); + Assert.assertTrue(cl.hasOption("test")); + } + +} diff --git a/tubemq-agent/pom.xml b/tubemq-agent/pom.xml new file mode 100644 index 0000000..22ba503 --- /dev/null +++ b/tubemq-agent/pom.xml @@ -0,0 +1,331 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <groupId>org.apache.tubemq</groupId> + <artifactId>tubemq-agent</artifactId> + <packaging>pom</packaging> + <version>0.0.1-SNAPSHOT</version> + <modelVersion>4.0.0</modelVersion> + + <modules> + <module>agent-common</module> + <module>agent-core</module> + </modules> + + <properties> + <awaitility.version>4.0.3</awaitility.version> + <bytebuddy.version>1.10.10</bytebuddy.version> + <common.io>2.6</common.io> + <common.lang3.version>3.10</common.lang3.version> + <commons.cli.version>1.4</commons.cli.version> + <dbutils.version>1.7</dbutils.version> + <encoding>UTF-8</encoding> + <gson.version>2.8.5</gson.version> + <guava.version>12.0.1</guava.version> + <jdk.version>1.8</jdk.version> + <log4j2.version>2.13.1</log4j2.version> + <mockito.version>3.3.3</mockito.version> + <plugin.assembly.version>3.2.0</plugin.assembly.version> + <plugin.compile.version>3.8.1</plugin.compile.version> + <slf4j.version>1.7.30</slf4j.version> + <unit.version>4.13</unit.version> + <bussdk.version>1.2.17</bussdk.version> + <common.lang.version>2.4</common.lang.version> + <spring.version>2.5.6</spring.version> + <oro.version>2.0.8</oro.version> + <aviator.version>2.2.1</aviator.version> + <avro.version>1.7.2</avro.version> + + <netty.version>3.8.0.Final</netty.version> + <snappy.version>1.0.4.1</snappy.version> + <protobuf.version>2.5.0</protobuf.version> + <httpclient.version>4.5.13</httpclient.version> + <fastjson.version>1.2.68</fastjson.version> + <sleepycat.version>6.4.9</sleepycat.version> + <hippoclient.version>2.0.5</hippoclient.version> + <jetty.version>9.4.34.v20201102</jetty.version> + <rocksdb.version>6.14.6</rocksdb.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpclient.version}</version> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + <version>${rocksdb.version}</version> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy.version}</version> + </dependency> + <dependency> + <groupId>com.tencent.tdbank</groupId> + <artifactId>TDBusSDK</artifactId> + <version>${bussdk.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <artifactId>commons-dbutils</artifactId> + <groupId>commons-dbutils</groupId> + <version>${dbutils.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <artifactId>gson</artifactId> + <groupId>com.google.code.gson</groupId> + <version>${gson.version}</version> + </dependency> + + <dependency> + <artifactId>commons-cli</artifactId> + <groupId>commons-cli</groupId> + <version>${commons.cli.version}</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>${common.lang.version}</version> + </dependency> + + <dependency> + <groupId>oro</groupId> + <artifactId>oro</artifactId> + <version>${oro.version}</version> + </dependency> + + <dependency> + <groupId>com.googlecode.aviator</groupId> + <artifactId>aviator</artifactId> + <version>${aviator.version}</version> + </dependency> + <dependency> + <groupId>com.sleepycat</groupId> + <artifactId>je</artifactId> + <version>${sleepycat.version}</version> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> + <groupId>org.ini4j</groupId> + <artifactId>ini4j</artifactId> + <version>0.5.1</version> + </dependency> + + <dependency> + <artifactId>commons-lang3</artifactId> + <groupId>org.apache.commons</groupId> + <version>${common.lang3.version}</version> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring</artifactId> + <version>${spring.version}</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${jetty.version}</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty.version}</version> + </dependency> + + <dependency> + <artifactId>commons-io</artifactId> + <groupId>commons-io</groupId> + <version>${common.io}</version> + </dependency> + + <dependency> + <artifactId>rocksdbjni</artifactId> + <groupId>org.rocksdb</groupId> + <version>${rocksdb.version}</version> + </dependency> + + <dependency> + <artifactId>junit</artifactId> + <groupId>junit</groupId> + <scope>test</scope> + <version>${unit.version}</version> + </dependency> + + <!-- 内部组件互相依赖 --> + <dependency> + <artifactId>agent-common</artifactId> + <groupId>org.apache.tubemq</groupId> + <version>${project.version}</version> + </dependency> + <dependency> + <artifactId>agent-core</artifactId> + <groupId>org.apache.tubemq</groupId> + <version>${project.version}</version> + </dependency> + <dependency> + <artifactId>agent-plugins</artifactId> + <groupId>org.apache.tubemq</groupId> + <version>${project.version}</version> + </dependency> + <dependency> + <artifactId>mockito-core</artifactId> + <groupId>org.mockito</groupId> + <scope>test</scope> + <version>${mockito.version}</version> + </dependency> + <dependency> + <artifactId>byte-buddy</artifactId> + <groupId>net.bytebuddy</groupId> + <scope>test</scope> + <version>${bytebuddy.version}</version> + </dependency> + + <dependency> + <artifactId>awaitility</artifactId> + <groupId>org.awaitility</groupId> + <scope>test</scope> + <version>${awaitility.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <pluginRepositories> + <pluginRepository> + <id>central</id> + <name>Nexus tencent</name> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + <url>https://mirrors.tencent.com/nexus/repository/maven-public/</url> + </pluginRepository> + </pluginRepositories> + + <repositories> + <repository> + <id>central</id> + <name>Nexus tencent</name> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + <url>https://mirrors.tencent.com/nexus/repository/maven-public/</url> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <encoding>${encoding}</encoding> + <source>${jdk.version}</source> + <target>${jdk.version}</target> + </configuration> + <version>${plugin.compile.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.16</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>../codestyle/checkstyle.xml</configLocation> + <suppressionsLocation>../codestyle/suppressions.xml</suppressionsLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeResources>false</includeResources> + <includeTestResources>false</includeTestResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <id>checkstyle</id> + <phase>validate</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file
