This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6260e1a705 [INLONG-10113][Agent] Delete useless code (#10114)
6260e1a705 is described below
commit 6260e1a705e75ad2b8bc489ddf198d46d2c4a107
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Apr 30 11:11:20 2024 +0800
[INLONG-10113][Agent] Delete useless code (#10114)
---
.../inlong/agent/constant/AgentConstants.java | 65 ---------
.../inlong/agent/constant/CommonConstants.java | 70 ---------
.../inlong/agent/constant/FetcherConstants.java | 3 -
.../apache/inlong/agent/constant/JobConstants.java | 161 ---------------------
.../inlong/agent/constant/KubernetesConstants.java | 36 -----
.../inlong/agent/constant/MetadataConstants.java | 41 ------
.../inlong/agent/constant/TaskConstants.java | 21 ---
.../org/apache/inlong/agent/pojo/FileTask.java | 17 ---
.../apache/inlong/agent/pojo/TaskProfileDto.java | 12 --
.../inlong/agent/plugin/utils/MetaDataUtils.java | 139 ------------------
.../inlong/agent/plugin/utils/PluginUtils.java | 29 ----
.../agent/plugin/utils/file/FileDataUtils.java | 122 ----------------
.../apache/inlong/agent/plugin/sinks/MockSink.java | 4 +-
.../agent/plugin/utils/MetaDataUtilsTest.java | 37 -----
14 files changed, 2 insertions(+), 755 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 20bf4af793..36c5423ca3 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -29,75 +29,20 @@ public class AgentConstants {
public static final String AGENT_HOME = "agent.home";
public static final String DEFAULT_AGENT_HOME =
System.getProperty("agent.home");
-
- public static final String AGENT_LOCAL_CACHE = "agent.local.cache";
- public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local";
-
- public static final String AGENT_LOCAL_CACHE_TIMEOUT =
"agent.local.cache.timeout";
- /**
- * cache timeout in minutes.
- **/
- public static final int DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT = 30;
-
- public static final String AGENT_LOCAL_STORE_PATH =
"agent.localStore.path";
- public static final String DEFAULT_AGENT_LOCAL_STORE_PATH = ".bdb";
-
public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path";
public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb";
public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task";
public static final String AGENT_LOCAL_DB_PATH_INSTANCE =
".localdb/instance";
public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset";
-
public static final String AGENT_UNIQ_ID = "agent.uniq.id";
- public static final String AGENT_DB_INSTANCE_NAME =
"agent.db.instance.name";
- public static final String DEFAULT_AGENT_DB_INSTANCE_NAME = "agent";
- public static final String AGENT_DB_CLASSNAME = "agent.db.classname";
- public static final String DEFAULT_AGENT_DB_CLASSNAME =
"org.apache.inlong.agent.db.RocksDbImp";
// default is empty.
public static final String AGENT_FETCHER_CLASSNAME =
"agent.fetcher.classname";
- public static final String AGENT_MESSAGE_FILTER_CLASSNAME =
"agent.message.filter.classname";
public static final String AGENT_CONF_PARENT = "agent.conf.parent";
public static final String DEFAULT_AGENT_CONF_PARENT = "conf";
public static final String AGENT_HTTP_PORT = "agent.http.port";
public static final int DEFAULT_AGENT_HTTP_PORT = 8008;
- public static final String AGENT_ENABLE_HTTP = "agent.http.enable";
- public static final boolean DEFAULT_AGENT_ENABLE_HTTP = false;
- public static final String TRIGGER_FETCH_INTERVAL =
"trigger.fetch.interval";
- public static final int DEFAULT_TRIGGER_FETCH_INTERVAL = 1;
- public static final String TRIGGER_MAX_RUNNING_NUM =
"trigger.max.running.num";
- public static final int DEFAULT_TRIGGER_MAX_RUNNING_NUM = 4096;
- public static final String AGENT_FETCH_CENTER_INTERVAL_SECONDS =
"agent.fetchCenter.interval";
- public static final int DEFAULT_AGENT_FETCH_CENTER_INTERVAL_SECONDS = 5;
- public static final String AGENT_TRIGGER_CHECK_INTERVAL_SECONDS =
"agent.trigger.check.interval";
- public static final int DEFAULT_AGENT_TRIGGER_CHECK_INTERVAL_SECONDS = 1;
- public static final String THREAD_POOL_AWAIT_TIME =
"thread.pool.await.time";
- // time in ms
- public static final long DEFAULT_THREAD_POOL_AWAIT_TIME = 300;
- public static final String JOB_MONITOR_INTERVAL = "job.monitor.interval";
- public static final int DEFAULT_JOB_MONITOR_INTERVAL = 5;
- public static final String JOB_FINISH_CHECK_INTERVAL =
"job.finish.checkInterval";
- public static final long DEFAULT_JOB_FINISH_CHECK_INTERVAL = 6L;
- public static final String TASK_RETRY_MAX_CAPACITY =
"task.retry.maxCapacity";
- public static final int DEFAULT_TASK_RETRY_MAX_CAPACITY = 10000;
- public static final String TASK_MONITOR_INTERVAL = "task.monitor.interval";
- public static final int DEFAULT_TASK_MONITOR_INTERVAL = 6;
- public static final String TASK_RETRY_SUBMIT_WAIT_SECONDS =
"task.retry.submit.waitSeconds";
- public static final int DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS = 5;
- public static final String TASK_MAX_RETRY_TIME = "task.maxRetry.time";
- public static final int DEFAULT_TASK_MAX_RETRY_TIME = 3;
- public static final String TASK_PUSH_MAX_SECOND = "task.push.maxSecond";
- public static final int DEFAULT_TASK_PUSH_MAX_SECOND = 2;
- public static final String TASK_PULL_MAX_SECOND = "task.pull.maxSecond";
- public static final int DEFAULT_TASK_PULL_MAX_SECOND = 2;
public static final String CHANNEL_MEMORY_CAPACITY =
"channel.memory.capacity";
public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 2000;
- public static final String TRIGGER_CHECK_INTERVAL =
"trigger.check.interval";
- public static final int DEFAULT_TRIGGER_CHECK_INTERVAL = 2;
- public static final String JOB_DB_CACHE_TIME = "job.db.cache.time";
- // cache for 3 days.
- public static final long DEFAULT_JOB_DB_CACHE_TIME = 3 * 24 * 60 * 60 *
1000;
- public static final String JOB_DB_CACHE_CHECK_INTERVAL =
"job.db.cache.check.interval";
- public static final int DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL = 60 * 60;
public static final String JOB_NUMBER_LIMIT = "job.number.limit";
public static final int DEFAULT_JOB_NUMBER_LIMIT = 15;
public static final String AGENT_LOCAL_IP = "agent.local.ip";
@@ -128,16 +73,9 @@ public class AgentConstants {
public static final String AGENT_HISTORY_PATH = "agent.history.path";
public static final String DEFAULT_AGENT_HISTORY_PATH = ".history";
- public static final String JOB_VERSION = "job.version";
- public static final Integer DEFAULT_JOB_VERSION = 1;
-
public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit";
public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
- public static final String AGENT_METRIC_LISTENER_CLASS =
"agent.domainListeners";
- public static final String AGENT_METRIC_LISTENER_CLASS_DEFAULT =
- "org.apache.inlong.agent.metrics.AgentPrometheusMetricListener";
-
// pulsar sink config
public static final String PULSAR_CLIENT_IO_TREHAD_NUM =
"agent.sink.pulsar.client.io.thread.num";
public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
@@ -196,7 +134,4 @@ public class AgentConstants {
public static final String DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER =
"org.apache.kafka.common.serialization.ByteArraySerializer";
-
- public static final String AGENT_JOB_STORE_TIME = "agent.job.store.time";
- public static final long DEFAULT_JOB_STORE_TIME = 10 * 60 * 1000;
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index eea64d176d..a92b3222ad 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -17,29 +17,17 @@
package org.apache.inlong.agent.constant;
-import org.apache.inlong.agent.utils.AgentUtils;
-
/**
* Common constants.
*/
public class CommonConstants {
- public static final String PROXY_NET_TAG = "proxy.net.tag";
- public static final String DEFAULT_PROXY_NET_TAG = "";
-
public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId";
public static final String DEFAULT_PROXY_INLONG_GROUP_ID =
"default_inlong_group_id";
- public static final String POSITION_SUFFIX = ".position";
public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId";
public static final String DEFAULT_PROXY_INLONG_STREAM_ID =
"default_inlong_stream_id";
- public static final String PROXY_LOCAL_HOST = "proxy.localHost";
- public static final String DEFAULT_PROXY_LOCALHOST =
AgentUtils.getLocalIp();
-
- public static final String PROXY_IS_LOCAL_VISIT = "proxy.isLocalVisit";
- public static final boolean DEFAULT_PROXY_IS_LOCAL_VISIT = true;
-
public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE =
"proxy.total.async.proxy.size";
public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024
* 1024;
@@ -99,73 +87,15 @@ public class CommonConstants {
public static final String PROXY_KEY_GROUP_ID = "inlongGroupId";
public static final String PROXY_KEY_STREAM_ID = "inlongStreamId";
public static final String PROXY_KEY_DATA = "dataKey";
- public static final String PROXY_KEY_ID = "id";
- public static final String PROXY_KEY_AGENT_IP = "agentip";
- public static final String PROXY_OCEANUS_F = "f";
- public static final String PROXY_OCEANUS_BL = "bl";
-
- // config for pulsar
- // pulsar host port like http://host1:port1
- public static final String PULSAR_SERVERS = "pulsar.servers";
- // pulsar topic name
- public static final String PULSAR_TOPIC = "pulsar.topic";
- // whether async sending data
- public static final String PULSAR_PRODUCER_ASYNC = "pulsar.producer.async";
- public static final boolean DEFAULT_PULSAR_PRODUCER_ASYNC = true;
-
- public static final String PULSAR_PRODUCER_MAX_PENDING_COUNT =
"pulsar.producer.maxPending.count";
- public static final int DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT = 10000;
-
- public static final String PULSAR_PRODUCER_THREAD_NUM =
"pulsar.producer.thread.num";
- public static final int DEFAULT_PULSAR_PRODUCER_THREAD_NUM = 1;
-
- public static final String PULSAR_PRODUCER_ENABLE_BATCH =
"pulsar.producer.enable.batch";
- public static final boolean DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH = true;
-
- public static final String PULSAR_SINK_POLL_TIMEOUT =
"pulsar.sink.poll.timeout";
- // time in ms
- public static final long DEFAULT_PULSAR_SINK_POLL_TIMEOUT = 1000;
-
- public static final String PULSAR_SINK_CACHE_CAPACITY =
"pulsar.sink.cache.capacity";
- public static final int DEFAULT_PULSAR_SINK_CACHE_CAPACITY = 100000;
-
- public static final String PULSAR_PRODUCER_COMPRESS_TYPE =
"pulsar.producer.compress.type";
- public static final String DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE =
"snappy";
-
- public static final String PULSAR_PRODUCER_BATCH_MAXSIZE =
"pulsar.producer.batch.maxsize";
- public static final int DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE = 1024 *
1024;
-
- public static final String PULSAR_PRODUCER_BATCH_MAXCOUNT =
"pulsar.producer.batch.maxcount";
- public static final int DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT = 1000;
-
- public static final String PULSAR_PRODUCER_BLOCK_QUEUE =
"pulsar.producer.block.queue";
- public static final boolean DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE = true;
public static final int DEFAULT_FILE_MAX_NUM = 4096;
-
- public static final String FILE_MAX_NUM = "file.max.num";
-
- public static final String TRIGGER_ID_PREFIX = "trigger_";
-
public static final String TASK_ID_PREFIX = "task_";
-
public static final String INSTANCE_ID_PREFIX = "ins_";
-
public static final String OFFSET_ID_PREFIX = "offset_";
-
- public static final String COMMAND_STORE_INSTANCE_NAME = "commandStore";
-
public static final String AGENT_OS_NAME = "os.name";
public static final String AGENT_NIX_OS = "nix";
public static final String AGENT_NUX_OS = "nux";
public static final String AGENT_COLON = ":";
-
public static final Integer DEFAULT_MAP_CAPACITY = 16;
-
- public static final String KEY_METRICS_INDEX = "metricsIndex";
-
public static final String COMMA = ",";
- public static final String DELIMITER_UNDERLINE = "_";
- public static final String DELIMITER_HYPHEN = "-";
-
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index e2004e6350..727748ae05 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -25,8 +25,6 @@ public class FetcherConstants {
public static final String AGENT_FETCHER_INTERVAL =
"agent.fetcher.interval";
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;
- public static final String AGENT_HEARTBEAT_INTERVAL =
"agent.heartbeat.interval";
- public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
public static final String AGENT_MANAGER_REQUEST_TIMEOUT =
"agent.manager.request.timeout";
// default is 30s
public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;
@@ -51,7 +49,6 @@ public class FetcherConstants {
public static final int AGENT_HTTP_SUCCESS_CODE = 200;
- public static final String AGENT_MANAGER_RETURN_PARAM_IP = "ip";
public static final String AGENT_MANAGER_RETURN_PARAM_DATA = "data";
public static final String AGENT_MANAGER_AUTH_SECRET_ID =
"agent.manager.auth.secretId";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 7a3347f6e8..2ed1bc3666 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -25,175 +25,14 @@ public class JobConstants extends CommonConstants {
// job id
public static final String JOB_ID = "job.id";
public static final String JOB_INSTANCE_ID = "job.instance.id";
- public static final String JOB_IP = "job.ip";
- public static final String JOB_RETRY = "job.retry";
- public static final String JOB_UUID = "job.uuid";
public static final String JOB_GROUP_ID = "job.groupId";
public static final String JOB_STREAM_ID = "job.streamId";
public static final String JOB_SOURCE_CLASS = "job.source";
- public static final String JOB_SOURCE_TYPE = "job.sourceType";
-
public static final String JOB_CHANNEL = "job.channel";
- public static final String JOB_NAME = "job.name";
- public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
-
- public static final String DEFAULT_JOB_NAME = "default";
- public static final String JOB_DESCRIPTION = "job.description";
- public static final String DEFAULT_JOB_DESCRIPTION = "default job
description";
- public static final String DEFAULT_JOB_LINE_FILTER = "";
// sink config
public static final String JOB_SINK = "job.sink";
- public static final String JOB_PROXY_SEND = "job.proxySend";
- public static final boolean DEFAULT_JOB_PROXY_SEND = false;
public static final String JOB_MQ_ClUSTERS = "job.mqClusters";
public static final String JOB_MQ_TOPIC = "job.topicInfo";
-
- // File job
- public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger";
- public static final String JOB_DIR_FILTER_PATTERN =
"job.fileJob.dir.pattern"; // deprecated
- public static final String JOB_DIR_FILTER_PATTERNS =
"job.fileJob.dir.patterns";
- public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
- public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
- public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
- public static final String JOB_FILE_TRIGGER_TYPE =
"job.fileJob.collectType";
- public static final String JOB_FILE_LINE_END_PATTERN =
"job.fileJob.line.endPattern";
- public static final String JOB_FILE_CONTENT_COLLECT_TYPE =
"job.fileJob.contentCollectType";
- public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
- public static final String JOB_FILE_META_FILTER_BY_LABELS =
"job.fileJob.filterMetaByLabels";
- public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties";
- public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR =
"job.fileJob.dataSeparator";
- public static final String JOB_FILE_MONITOR_INTERVAL =
"job.fileJob.monitorInterval";
- public static final String JOB_FILE_MONITOR_STATUS =
"job.fileJob.monitorStatus";
- public static final String JOB_FILE_MONITOR_EXPIRE =
"job.fileJob.monitorExpire";
-
- // Binlog job
- public static final String JOB_DATABASE_USER = "job.binlogJob.user";
- public static final String JOB_DATABASE_PASSWORD =
"job.binlogJob.password";
- public static final String JOB_DATABASE_HOSTNAME =
"job.binlogJob.hostname";
- public static final String JOB_TABLE_WHITELIST =
"job.binlogJob.tableWhiteList";
- public static final String JOB_DATABASE_WHITELIST =
"job.binlogJob.databaseWhiteList";
- public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets";
- public static final String JOB_DATABASE_OFFSET_FILENAME =
"job.binlogJob.offset.filename";
-
- public static final String JOB_DATABASE_SERVER_TIME_ZONE =
"job.binlogJob.serverTimezone";
- public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS =
"job.binlogJob.offset.intervalMs";
-
- public static final String JOB_DATABASE_STORE_HISTORY_FILENAME =
"job.binlogJob.history.filename";
- public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES =
"job.binlogJob.schema";
- public static final String JOB_DATABASE_SNAPSHOT_MODE =
"job.binlogJob.snapshot.mode";
- public static final String JOB_DATABASE_HISTORY_MONITOR_DDL =
"job.binlogJob.ddl";
- public static final String JOB_DATABASE_PORT = "job.binlogJob.port";
-
- // Kafka job
- public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
- public static final String JOB_KAFKA_BOOTSTRAP_SERVERS =
"job.kafkaJob.bootstrap.servers";
- public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
- public static final String JOB_KAFKA_RECORD_SPEED_LIMIT =
"job.kafkaJob.recordSpeed.limit";
- public static final String JOB_KAFKA_BYTE_SPEED_LIMIT =
"job.kafkaJob.byteSpeed.limit";
- public static final String JOB_KAFKA_OFFSET =
"job.kafkaJob.partition.offset";
- public static final String JOB_KAFKA_READ_TIMEOUT =
"job.kafkaJob.read.timeout";
- public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET =
"job.kafkaJob.autoOffsetReset";
-
- public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
- public static final String JOB_MONGO_USER = "job.mongoJob.user";
- public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
- public static final String JOB_MONGO_DATABASE_INCLUDE_LIST =
"job.mongoJob.databaseIncludeList";
- public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST =
"job.mongoJob.databaseExcludeList";
- public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST =
"job.mongoJob.collectionIncludeList";
- public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST =
"job.mongoJob.collectionExcludeList";
- public static final String JOB_MONGO_FIELD_EXCLUDE_LIST =
"job.mongoJob.fieldExcludeList";
- public static final String JOB_MONGO_SNAPSHOT_MODE =
"job.mongoJob.snapshotMode";
- public static final String JOB_MONGO_CAPTURE_MODE =
"job.mongoJob.captureMode";
- public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize";
- public static final String JOB_MONGO_STORE_HISTORY_FILENAME =
"job.mongoJob.history.filename";
- public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE =
"job.mongoJob.offset.specificOffsetFile";
- public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS =
"job.mongoJob.offset.specificOffsetPos";
- public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets";
- public static final String JOB_MONGO_CONNECT_TIMEOUT_MS =
"job.mongoJob.connectTimeoutInMs";
- public static final String JOB_MONGO_CURSOR_MAX_AWAIT =
"job.mongoJob.cursorMaxAwaitTimeInMs";
- public static final String JOB_MONGO_SOCKET_TIMEOUT =
"job.mongoJob.socketTimeoutInMs";
- public static final String JOB_MONGO_SELECTION_TIMEOUT =
"job.mongoJob.selectionTimeoutInMs";
- public static final String JOB_MONGO_FIELD_RENAMES =
"job.mongoJob.fieldRenames";
- public static final String JOB_MONGO_MEMBERS_DISCOVER =
"job.mongoJob.membersAutoDiscover";
- public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS =
"job.mongoJob.connectMaxAttempts";
- public static final String JOB_MONGO_BACKOFF_MAX_DELAY =
"job.mongoJob.connectBackoffMaxDelayInMs";
- public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY =
"job.mongoJob.connectBackoffInitialDelayInMs";
- public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS =
"job.mongoJob.initialSyncMaxThreads";
- public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED =
"job.mongoJob.sslInvalidHostnameAllowed";
- public static final String JOB_MONGO_SSL_ENABLE =
"job.mongoJob.sslEnabled";
- public static final String JOB_MONGO_POLL_INTERVAL =
"job.mongoJob.pollIntervalInMs";
-
- public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L;
-
- // job type, delete/add
- public static final String JOB_TYPE = "job.type";
-
- public static final String JOB_CHECKPOINT = "job.checkpoint";
-
- public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
-
- // time in min
- public static final int DEFAULT_JOB_FILE_MAX_WAIT = 1;
-
- public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait";
-
- public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 3;
-
- public static final String JOB_ID_PREFIX = "job_";
-
- public static final String SQL_JOB_ID = "sql_job_id";
-
- public static final String JOB_STORE_TIME = "job.store.time";
-
- public static final String JOB_OP = "job.op";
-
- public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; //
TODO:delete it
-
- // field splitter
- public static final String JOB_FIELD_SPLITTER = "job.splitter";
-
- // job delivery time
- public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
-
- // job time reading file
- public static final String JOB_DATA_TIME = "job.dataTime";
-
- // job of the number of seconds to wait before starting the task
- public static final String JOB_TASK_BEGIN_WAIT_SECONDS =
"job.taskWaitSeconds";
-
- /**
- * when job is retried, the retry time should be provided
- */
- public static final String JOB_RETRY_TIME = "job.retryTime";
-
- /**
- * delimiter to split offset for different task
- */
- public static final String JOB_OFFSET_DELIMITER = "_";
-
- /**
- * delimiter to split all partition offset for all kafka tasks
- */
- public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#";
-
- /**
- * sync send data when sending to DataProxy
- */
- public static final int SYNC_SEND_OPEN = 1;
-
- public static final String INTERVAL_MILLISECONDS = "1000";
-
- /**
- * monitor switch, 1 true and 0 false
- */
- public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1";
-
- /**
- * monitor expire time and the time in milliseconds.
- * default value is -1 and stand for not expire time.
- */
- public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1";
-
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
deleted file mode 100644
index bc401ddb91..0000000000
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * k8s information
- */
-public class KubernetesConstants {
-
- public static final String KUBERNETES = "kubernetes";
- public static final String HTTPS = "https://";
- public static final String KUBERNETES_SERVICE_HOST =
"KUBERNETES_SERVICE_HOST";
- public static final String KUBERNETES_SERVICE_PORT =
"KUBERNETES_SERVICE_PORT";
-
- // k8s information
- public static final String NAMESPACE = "namespace";
- public static final String POD_NAME = "pod.name";
- public static final String CONTAINER_NAME = "container.name";
- public static final String CONTAINER_ID = "container.id";
-
-}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
deleted file mode 100644
index 9696440de5..0000000000
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * Metadata type
- */
-public class MetadataConstants {
-
- public static final String DATA_CONTENT = "__content__";
- public static final String DATA_CONTENT_TIME = "__LogTime__";
-
- // k8s metadata
- public static final String METADATA_CONTAINER_ID = "__container_id__";
- public static final String METADATA_CONTAINER_NAME = "__container_name__";
- public static final String METADATA_NAMESPACE = "__namespace__";
- public static final String METADATA_POD_UID = "__pod_uid__";
- public static final String METADATA_POD_NAME = "__pod_name__";
- public static final String METADATA_POD_LABEL = "__pod_label__";
-
- // cvm metadata
- public static final String ENV_CVM = "cvm";
- public static final String METADATA_HOST_NAME = "__HostName__";
- public static final String METADATA_SOURCE_IP = "__SourceIP__";
- public static final String METADATA_FILE_NAME = "__FileName__";
-}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 23d6fe8dc2..34957fd65f 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -59,9 +59,6 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT =
"task.fileTask.cycleUnit";
public static final String TASK_FILE_CONTENT_COLLECT_TYPE =
"task.fileTask.contentCollectType";
- public static final String TASK_FILE_META_ENV_LIST =
"task.fileTask.envList";
- public static final String TASK_FILE_META_FILTER_BY_LABELS =
"task.fileTask.filterMetaByLabels";
- public static final String TASK_FILE_PROPERTIES =
"task.fileTask.properties";
public static final String SOURCE_DATA_CONTENT_STYLE =
"task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR =
"task.fileTask.dataSeparator";
public static final String TASK_RETRY = "task.fileTask.retry";
@@ -73,24 +70,6 @@ public class TaskConstants extends CommonConstants {
public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
- // Binlog task
- public static final String TASK_DATABASE_USER = "task.binlogTask.user";
- public static final String TASK_DATABASE_PASSWORD =
"task.binlogTask.password";
- public static final String TASK_DATABASE_HOSTNAME =
"task.binlogTask.hostname";
- public static final String TASK_TABLE_WHITELIST =
"task.binlogTask.tableWhiteList";
- public static final String TASK_DATABASE_WHITELIST =
"task.binlogTask.databaseWhiteList";
- public static final String TASK_DATABASE_OFFSETS =
"task.binlogTask.offsets";
- public static final String TASK_DATABASE_OFFSET_FILENAME =
"task.binlogTask.offset.filename";
-
- public static final String TASK_DATABASE_SERVER_TIME_ZONE =
"task.binlogTask.serverTimezone";
- public static final String TASK_DATABASE_STORE_OFFSET_INTERVAL_MS =
"task.binlogTask.offset.intervalMs";
-
- public static final String TASK_DATABASE_STORE_HISTORY_FILENAME =
"task.binlogTask.history.filename";
- public static final String TASK_DATABASE_INCLUDE_SCHEMA_CHANGES =
"task.binlogTask.schema";
- public static final String TASK_DATABASE_SNAPSHOT_MODE =
"task.binlogTask.snapshot.mode";
- public static final String TASK_DATABASE_HISTORY_MONITOR_DDL =
"task.binlogTask.ddl";
- public static final String TASK_DATABASE_PORT = "task.binlogTask.port";
-
// Kafka task
public static final String TASK_KAFKA_TOPIC = "task.kafkaTask.topic";
public static final String TASK_KAFKA_BOOTSTRAP_SERVERS =
"task.kafkaTask.bootstrap.servers";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index df7ab22053..6397ecf5db 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -19,7 +19,6 @@ package org.apache.inlong.agent.pojo;
import lombok.Data;
-import java.util.List;
import java.util.Map;
@Data
@@ -43,18 +42,10 @@ public class FileTask {
// FULL
private String contentCollectType;
- private String envList;
-
- // JSON string, the content format is List<Map<String, String>>
- private String metaFields;
-
private String dataContentStyle;
private String dataSeparator;
- // JSON string, the content format is Map<String,string>
- private String filterMetaByLabels;
-
// JSON string, the content format is Map<String,Object>
private String properties;
@@ -122,11 +113,6 @@ public class FileTask {
// Type of file content, for example: FULL, INCREMENT
private String contentCollectType;
- // File needs to collect environment information, for example:
kubernetes
- private String envList;
- // Metadata of data, for example:
- // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and
so on
- private List<Map<String, String>> metaFields;
// Type of data result for column separator
// CSV format, set this parameter to a custom separator: , | :
// Json format, set this parameter to json
@@ -135,9 +121,6 @@ public class FileTask {
// Column separator of data source
private String dataSeparator;
- // Metadata filters by label, special parameters for K8S
- private Map<String, String> filterMetaByLabels;
-
// Properties for file
private Map<String, Object> properties;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 0b6d5bcc2a..374b7331d4 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -172,18 +172,6 @@ public class TaskProfileDto {
fileTask.setLine(line);
}
- if (null != taskConfig.getEnvList()) {
- fileTask.setEnvList(taskConfig.getEnvList());
- }
-
- if (null != taskConfig.getMetaFields()) {
- fileTask.setMetaFields(GSON.toJson(taskConfig.getMetaFields()));
- }
-
- if (null != taskConfig.getFilterMetaByLabels()) {
-
fileTask.setFilterMetaByLabels(GSON.toJson(taskConfig.getFilterMetaByLabels()));
- }
-
if (null != taskConfig.getMonitorInterval()) {
fileTask.setMonitorInterval(taskConfig.getMonitorInterval());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
deleted file mode 100644
index 863eaf7952..0000000000
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.utils;
-
-import org.apache.inlong.agent.conf.AbstractConfiguration;
-import org.apache.inlong.agent.constant.CommonConstants;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.commons.lang3.StringUtils;
-
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
-import static
org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
-import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
-import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_META_FILTER_BY_LABELS;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_PROPERTIES;
-
-/**
- * Metadata utils
- */
-public class MetaDataUtils {
-
- private static final Gson GSON = new Gson();
-
- private static final String LOG_MARK = ".log";
-
- // standard log path for k8s
- private static final String FILE_NAME_PATTERN =
"(^[-a-zA-Z0-9]+)_([a-zA-Z0-9-]+)_([a-zA-Z0-9-]+)(.log)";
-
- private static final Pattern PATTERN = Pattern.compile(FILE_NAME_PATTERN);
-
- /**
- * standard log for k8s
- *
- * get pod_name,namespace,container_name,container_id
- */
- public static Map<String, String> getLogInfo(String fileName) {
- Matcher matcher = PATTERN.matcher(fileName);
- Map<String, String> podInf = new HashMap<>();
- if (StringUtils.isBlank(fileName) || !matcher.matches()) {
- return podInf;
- }
- // file name example:
/var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log
- String[] str = fileName.split(CommonConstants.DELIMITER_UNDERLINE);
- podInf.put(POD_NAME, str[0]);
- podInf.put(NAMESPACE, str[1]);
- String[] containerInfo =
str[2].split(CommonConstants.DELIMITER_HYPHEN);
- String containerId = containerInfo[containerInfo.length -
1].replace(LOG_MARK, "");
- String containerName = "";
- for (int i = 0; i < containerInfo.length - 1; i++) {
- if (i == containerInfo.length - 2) {
- containerName = containerName.concat(containerInfo[i]);
- break;
- }
- containerName =
containerName.concat(containerInfo[i]).concat(CommonConstants.DELIMITER_HYPHEN);
- }
- podInf.put(CONTAINER_NAME, containerName);
- podInf.put(CONTAINER_ID, containerId);
- return podInf;
- }
-
- /**
- * standard log for k8s
- *
- * get labels of pod
- */
- public static Map<String, String> getPodLabels(AbstractConfiguration
taskProfile) {
- if (Objects.isNull(taskProfile) ||
!taskProfile.hasKey(TASK_FILE_META_FILTER_BY_LABELS)) {
- return new HashMap<>();
- }
- String labels = taskProfile.get(TASK_FILE_META_FILTER_BY_LABELS);
- Type type = new TypeToken<HashMap<String, String>>() {
- }.getType();
- return GSON.fromJson(labels, type);
- }
-
- public static List<String> getNamespace(AbstractConfiguration taskProfile)
{
- if (Objects.isNull(taskProfile) ||
!taskProfile.hasKey(TASK_FILE_PROPERTIES)) {
- return null;
- }
- String property = taskProfile.get(TASK_FILE_PROPERTIES);
- Type type = new TypeToken<HashMap<Integer, String>>() {
- }.getType();
- Map<String, String> properties = GSON.fromJson(property, type);
- return properties.keySet().stream().map(data -> {
- if (data.contains(NAMESPACE)) {
- return properties.get(data);
- }
- return null;
- }).filter(Objects::nonNull).collect(Collectors.toList());
- }
-
- /**
- * standard log for k8s
- *
- * get name of pod
- */
- public static String getPodName(AbstractConfiguration taskProfile) {
- if (Objects.isNull(taskProfile) ||
!taskProfile.hasKey(TASK_FILE_PROPERTIES)) {
- return null;
- }
- String property = taskProfile.get(TASK_FILE_PROPERTIES);
- Type type = new TypeToken<HashMap<Integer, String>>() {
- }.getType();
- Map<String, String> properties = GSON.fromJson(property, type);
- List<String> podName = properties.keySet().stream().map(data -> {
- if (data.contains(POD_NAME)) {
- return properties.get(data);
- }
- return null;
- }).filter(Objects::nonNull).collect(Collectors.toList());
- return podName.isEmpty() ? null : podName.get(0);
- }
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index db0c1ce6d3..c395fedb05 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -18,30 +18,22 @@
package org.apache.inlong.agent.plugin.utils;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.task.PathPattern;
import org.apache.inlong.agent.utils.AgentUtils;
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -51,9 +43,6 @@ import static
org.apache.inlong.agent.constant.CommonConstants.AGENT_NIX_OS;
import static org.apache.inlong.agent.constant.CommonConstants.AGENT_NUX_OS;
import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
-import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
-import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
-import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
import static
org.apache.inlong.agent.constant.TaskConstants.FILE_DIR_FILTER_PATTERNS;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_TIME_OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY_TIME;
@@ -156,22 +145,4 @@ public class PluginUtils {
}
}
}
-
- // TODO only support default config in the POD
- public static KubernetesClient getKubernetesClient() throws IOException {
- String ip = System.getenv(KUBERNETES_SERVICE_HOST);
- String port = System.getenv(KUBERNETES_SERVICE_PORT);
- if (Objects.isNull(ip) && Objects.isNull(port)) {
- throw new RuntimeException("get k8s client error,k8s env ip and
port is null");
- }
- String maserUrl =
HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
- Config config = new ConfigBuilder()
- .withMasterUrl(maserUrl)
- .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
- .withOauthToken(new String(
- Files.readAllBytes((new
File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
- .build();
- return new KubernetesClientBuilder().withConfig(config).build();
- }
-
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
index 0b7d310a47..57b4702848 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java
@@ -17,143 +17,21 @@
package org.apache.inlong.agent.plugin.utils.file;
-import org.apache.inlong.agent.conf.AbstractConfiguration;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
-import org.apache.inlong.agent.plugin.utils.PluginUtils;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.reflect.TypeToken;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.dsl.PodResource;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
-import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
/**
* File job utils
*/
public class FileDataUtils {
- public static final String KUBERNETES_LOG = "log";
- private static final Logger LOGGER =
LoggerFactory.getLogger(FileDataUtils.class);
- private static final Gson GSON = new Gson();
-
public static String getInodeInfo(String fileName) throws IOException {
BasicFileAttributes attributesAfter;
Path path = Paths.get(fileName);
attributesAfter = Files.readAttributes(path,
BasicFileAttributes.class);
return attributesAfter.fileKey().toString();
}
-
- /**
- * Get standard log for k8s
- */
- public static String getK8sJsonLog(String log, Boolean isJson) {
- if (!StringUtils.isNoneBlank(log)) {
- return "";
- }
- if (!isJson) {
- return log;
- }
- Type type = new TypeToken<HashMap<String, String>>() {
- }.getType();
- Map<String, String> logJson = GSON.fromJson(log, type);
- return logJson.getOrDefault(KUBERNETES_LOG, log);
- }
-
- /**
- * To judge json
- */
- public static boolean isJSON(String json) {
- boolean isJson;
- try {
- JsonObject convertedObject = new Gson().fromJson(json,
JsonObject.class);
- isJson = convertedObject.isJsonObject();
- } catch (Exception exception) {
- return false;
- }
- return isJson;
- }
-
- /**
- * Filter file by conditions
- */
- public static Collection<File> filterFile(Collection<File> allFiles,
AbstractConfiguration jobConf) {
- // filter file by labels
- Collection<File> files = null;
- try {
- files = filterByLabels(allFiles, jobConf);
- } catch (IOException e) {
- LOGGER.error("filter file error: ", e);
- }
- return files;
- }
-
- /**
- * Filter file by labels if standard log for k8s
- */
- private static Collection<File> filterByLabels(Collection<File> allFiles,
AbstractConfiguration jobConf)
- throws IOException {
- Map<String, String> labelsMap = MetaDataUtils.getPodLabels(jobConf);
- if (labelsMap.isEmpty()) {
- return allFiles;
- }
- Collection<File> standardK8sLogFiles = new ArrayList<>();
- Iterator<File> iterator = allFiles.iterator();
- KubernetesClient client = PluginUtils.getKubernetesClient();
- while (iterator.hasNext()) {
- File file = getFile(labelsMap, iterator.next(), client);
- if (file == null) {
- continue;
- }
- standardK8sLogFiles.add(file);
- }
- return standardK8sLogFiles;
- }
-
- private static File getFile(Map<String, String> labelsMap, File file,
KubernetesClient client) {
- Map<String, String> logInfo = MetaDataUtils.getLogInfo(file.getName());
- if (logInfo.isEmpty()) {
- return null;
- }
- PodResource podResource =
client.pods().inNamespace(logInfo.get(NAMESPACE))
- .withName(logInfo.get(POD_NAME));
- if (Objects.isNull(podResource)) {
- return null;
- }
- Pod pod = podResource.get();
- Map<String, String> podLabels = pod.getMetadata().getLabels();
- boolean filterLabelStatus = false;
- for (String key : labelsMap.keySet()) {
- if (podLabels.containsKey(key) &&
labelsMap.get(key).contains(podLabels.get(key))) {
- filterLabelStatus = true;
- continue;
- }
- if (podLabels.containsKey(key) &&
!labelsMap.get(key).contains(podLabels.get(key))) {
- filterLabelStatus = false;
- break;
- }
- }
- return filterLabelStatus ? file : null;
- }
-
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 86431c4ee7..5d1f79f50f 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -28,8 +28,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constant.TaskConstants.SINK_DATA_TIME;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
public class MockSink extends AbstractSink {
@@ -59,7 +59,7 @@ public class MockSink extends AbstractSink {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
dataTime =
AgentUtils.timeStrConvertToMillSec(jobConf.get(SINK_DATA_TIME, ""),
- jobConf.get(JOB_CYCLE_UNIT, ""));
+ jobConf.get(TASK_CYCLE_UNIT, ""));
sourceFileName = "test";
LOGGER.info("get dataTime is : {}", dataTime);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java
deleted file mode 100644
index 5dfc655e53..0000000000
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * metadata of k8s utils test
- */
-public class MetaDataUtilsTest {
-
- @Test
- public void getLogInfo() {
- String fileName = "testcase-0_xb-test240_testcase2"
- +
"-8050825882878a0aef05cd597abb09917a1e090d09f4d1ed288488311ca0309c.log";
- Map<String, String> metaMap = MetaDataUtils.getLogInfo(fileName);
- Assert.assertEquals(4, metaMap.size());
- }
-}
\ No newline at end of file