This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7ccbd97a6e [INLONG-11571][Agent] Add classes for actual collection of
COS source (#11572)
7ccbd97a6e is described below
commit 7ccbd97a6e20312d9ff0debdde18dd482b065442
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Dec 4 16:56:46 2024 +0800
[INLONG-11571][Agent] Add classes for actual collection of COS source
(#11572)
* [INLONG-11571][Agent] Add classes for actual collection of COS source
* [INLONG-11571][Agent] Delete invalid comments
* Update
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
Co-authored-by: AloysZhang <[email protected]>
* [INLONG-11571][Agent] Modify code based on comments
---------
Co-authored-by: AloysZhang <[email protected]>
---
inlong-agent/agent-plugins/pom.xml | 5 +
.../agent/plugin/fetcher/ManagerFetcher.java | 3 +-
.../inlong/agent/plugin/instance/COSInstance.java | 35 ++
.../agent/plugin/instance/CommonInstance.java | 2 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 12 +-
.../inlong/agent/plugin/sources/COSSource.java | 356 +++++++++++++++++++++
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../agent/plugin/sources/file/AbstractSource.java | 14 +-
.../inlong/agent/plugin/task/cos/COSTask.java | 338 +++++++++++++++++++
.../inlong/agent/plugin/task/cos/FileScanner.java | 165 ++++++++++
.../agent/plugin/utils/cos/COSConfigHandler.java | 26 ++
.../inlong/agent/plugin/utils/cos/COSUtils.java | 33 ++
.../plugin/utils/cos/DefaultCOSConfigHandler.java | 28 ++
licenses/inlong-agent/LICENSE | 1 +
.../inlong-agent/licenses/LICENSE-cos-java-sdk.txt | 21 ++
pom.xml | 1 +
16 files changed, 1025 insertions(+), 17 deletions(-)
diff --git a/inlong-agent/agent-plugins/pom.xml
b/inlong-agent/agent-plugins/pom.xml
index 4c77f84dc3..863e285ab2 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -92,6 +92,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>cos_api</artifactId>
+ <version>${cos.sdk.version}</version>
+ </dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 01d7128f8b..3ab63b0118 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.AgentResponseCode;
@@ -237,7 +238,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
dataConfig.setInlongGroupId("devcloud_group_id");
dataConfig.setInlongStreamId("devcloud_stream_id");
dataConfig.setDataReportType(0);
- dataConfig.setTaskType(3);
+ dataConfig.setTaskType(TaskTypeEnum.FILE.getType());
dataConfig.setTaskId(taskId);
dataConfig.setState(state);
dataConfig.setTimeZone("GMT+8:00");
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
new file mode 100644
index 0000000000..6f40f727dd
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/COSInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+/**
+ * cos instance contains source and sink.
+ * main job is to read from source and write to sink
+ */
+public class COSInstance extends CommonInstance {
+
+ @Override
+ public void setInodeInfo(InstanceProfile profile) throws IOException {
+ profile.set(TaskConstants.INODE_INFO, "");
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 7267066aee..6cc97a159c 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -122,7 +122,7 @@ public abstract class CommonInstance extends Instance {
@Override
public void run() {
- Thread.currentThread().setName("file-instance-core-" + getTaskId() +
"-" + getInstanceId());
+ Thread.currentThread().setName("instance-core-" + getTaskId() + "-" +
getInstanceId());
running = true;
try {
doRun();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 2f54ec59de..e00ad65cba 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -196,18 +196,18 @@ public class ProxySink extends AbstractSink {
}
Long start = AgentUtils.getCurrentTime();
shutdown = true;
+ senderManager.Stop();
+ LOGGER.info("destroy proxySink, wait for sender close {} ms instance
{}", AgentUtils.getCurrentTime() - start,
+ profile.getInstanceId());
+ start = AgentUtils.getCurrentTime();
while (running || offsetRunning) {
AgentUtils.silenceSleepInMs(LOOP_WAIT_TIME_MS);
}
- LOGGER.info("destroy proxySink wait run elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
- profile.getInstanceId());
- start = AgentUtils.getCurrentTime();
- senderManager.Stop();
- LOGGER.info("destroy proxySink wait sender elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ LOGGER.info("destroy proxySink, wait for run close {} ms instance {}",
AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
clearOffset();
- LOGGER.info("destroy proxySink wait offset elapse {} ms instance {}",
AgentUtils.getCurrentTime() - start,
+ LOGGER.info("destroy proxySink, wait for offset clear {} ms instance
{}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
LOGGER.info("destroy sink {} end", sourceName);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
new file mode 100755
index 0000000000..b6792ac82d
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/COSSource.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.FileStaticManager;
+import org.apache.inlong.agent.core.FileStaticManager.FileStatic;
+import org.apache.inlong.agent.core.task.MemoryManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
+import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.COSObject;
+import com.qcloud.cos.model.GetObjectRequest;
+import com.qcloud.cos.model.ObjectMetadata;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.TaskConstants.COS_CONTENT_STYLE;
+
+/**
+ * Read COS files
+ */
+public class COSSource extends AbstractSource {
+
+ public static final int LEN_OF_FILE_OFFSET_ARRAY = 2;
+ public static final String AGENT_GLOBAL_COS_SOURCE_PERMIT =
"agent.global.cos.source.permit";
+ public static final int DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT = 128 *
1000 * 1000;
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ protected class FileOffset {
+
+ private Long lineOffset;
+ private Long byteOffset;
+ private boolean hasByteOffset;
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(COSSource.class);
+ public static final String OFFSET_SEP = ":";
+ protected final Integer WAIT_TIMEOUT_MS = 10;
+ private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 1024 * 1024;
+ private final Long META_UPDATE_INTERVAL_MS = 10000L;
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
+
+ private String fileName;
+ private byte[] bufferToReadFile;
+ public volatile long linePosition = 0;
+ public volatile long bytePosition = 0;
+ private volatile boolean fileExist = true;
+ private volatile long lastInodeUpdateTime = 0;
+ private COSClient cosClient;
+ private String bucketName;
+ private String secretId;
+ private String secretKey;
+ private String strRegion;
+ private ObjectMetadata metadata;
+ protected BlockingQueue<SourceData> queue;
+ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
+ 0, Integer.MAX_VALUE,
+ 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new AgentThreadFactory("cos-source-pool"));
+ private volatile boolean running = false;
+
+ public COSSource() {
+ }
+
+ @Override
+ protected void initExtendClass() {
+ extendClass = DefaultExtendedHandler.class.getCanonicalName();
+ }
+
+ @Override
+ protected void initSource(InstanceProfile profile) {
+ try {
+ String offset = "";
+ if (offsetProfile != null) {
+ offset = offsetProfile.toJsonStr();
+ }
+ LOGGER.info("LogFileSource init: {} offset: {}",
profile.toJsonStr(), offset);
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ int permit = conf.getInt(AGENT_GLOBAL_COS_SOURCE_PERMIT,
DEFAULT_AGENT_GLOBAL_COS_SOURCE_PERMIT);
+
MemoryManager.getInstance().addSemaphore(AGENT_GLOBAL_COS_SOURCE_PERMIT,
permit);
+ fileName = profile.getInstanceId();
+ bucketName = profile.get(TaskConstants.COS_TASK_BUCKET_NAME);
+ secretId = profile.get(TaskConstants.COS_TASK_SECRET_ID);
+ secretKey = profile.get(TaskConstants.COS_TASK_SECRET_KEY);
+ strRegion = profile.get(TaskConstants.COS_TASK_REGION);
+ cosClient = COSUtils.createCli(secretId, secretKey, strRegion);
+ metadata = cosClient.getObjectMetadata(bucketName, fileName);
+ queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
+ bufferToReadFile = new byte[SIZE_OF_BUFFER_TO_READ_FILE];
+ lastInodeUpdateTime = AgentUtils.getCurrentTime();
+ initOffset(taskId);
+ EXECUTOR_SERVICE.execute(run());
+ } catch (Exception ex) {
+ stopRunning();
+ throw new FileException("error init stream for " + fileName, ex);
+ }
+ }
+
+ @Override
+ protected boolean doPrepareToRead() {
+ if (AgentUtils.getCurrentTime() - lastInodeUpdateTime >
META_UPDATE_INTERVAL_MS) {
+ metadata = cosClient.getObjectMetadata(bucketName, fileName);
+ lastInodeUpdateTime = AgentUtils.getCurrentTime();
+ }
+ if (metadata.getContentLength() < bytePosition) {
+ fileExist = false;
+ LOGGER.info("file rotate, instance will restart and offset will be
clean, file {}",
+ fileName);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected List<SourceData> readFromSource() {
+ if (queue.isEmpty()) {
+ return null;
+ }
+ int count = 0;
+ int len = 0;
+ List<SourceData> lines = new ArrayList<>();
+ while (!queue.isEmpty() && count < BATCH_READ_LINE_COUNT && len <
BATCH_READ_LINE_TOTAL_LEN) {
+ if (len + queue.peek().getData().length >
BATCH_READ_LINE_TOTAL_LEN) {
+ break;
+ }
+ len += queue.peek().getData().length;
+ count++;
+ lines.add(queue.poll());
+ }
+ MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT,
len);
+ return lines;
+ }
+
+ @Override
+ protected void printCurrentState() {
+ LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len
{}", fileName, linePosition,
+ bytePosition, metadata.getContentLength());
+ }
+
+ @Override
+ protected String getThreadName() {
+ return "cos-file-source-" + taskId + "-" + fileName;
+ }
+
+ private void initOffset(String taskId) {
+ long lineOffset;
+ long byteOffset;
+ if (offsetProfile != null) {
+ FileOffset fileOffset = parseFIleOffset(offsetProfile.getOffset());
+ lineOffset = fileOffset.lineOffset;
+ byteOffset = fileOffset.byteOffset;
+ LOGGER.info("initOffset inode no change taskId {} restore
lineOffset {} byteOffset {}, file {}", taskId,
+ lineOffset, byteOffset, fileName);
+ } else {
+ lineOffset = 0;
+ byteOffset = 0;
+ LOGGER.info("initOffset taskId {} for new all read lineOffset {}
byteOffset {} file {}", taskId,
+ lineOffset, byteOffset, fileName);
+ }
+ linePosition = lineOffset;
+ bytePosition = byteOffset;
+ }
+
+ private Runnable run() {
+ return () -> {
+ AgentThreadFactory.nameThread(getThreadName());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe file deleted: ", e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ }
+ running = false;
+ };
+ }
+
+ /**
+ * Read new lines.
+ *
+ * @return The new position after the lines have been read
+ * @throws IOException if an I/O error occurs.
+ */
+ protected void doRun() throws IOException {
+ GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName,
fileName);
+ getObjectRequest.setRange(bytePosition, metadata.getContentLength());
+ COSObject cosObject = cosClient.getObject(getObjectRequest);
+ InputStream inputStream = cosObject.getObjectContent();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ int num;
+ boolean overLen = false;
+ while ((num = inputStream.read(bufferToReadFile)) != -1) {
+ LOGGER.debug("read size {}", num);
+ for (int i = 0; i < num; i++) {
+ byte ch = bufferToReadFile[i];
+ bytePosition++;
+ switch (ch) {
+ case '\n':
+ linePosition++;
+ boolean suc = false;
+ while (isRunnable() && !suc) {
+ SourceData sourceData = new
SourceData(baos.toByteArray(),
+ getOffsetString(linePosition,
bytePosition));
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_COS_SOURCE_PERMIT,
+ sourceData.getData().length);
+ if (!suc4Queue) {
+ break;
+ }
+ suc = queue.offer(sourceData);
+ if (!suc) {
+ MemoryManager.getInstance()
+
.release(AGENT_GLOBAL_COS_SOURCE_PERMIT, sourceData.getData().length);
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+ }
+ }
+ if (overLen) {
+ LOGGER.warn("readLines over len finally string len
{}",
+ new String(baos.toByteArray()).length());
+ long auditTime = 0;
+ auditTime = profile.getSinkDataTime();
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId,
inlongStreamId,
+ auditTime, 1, maxPackSize, auditVersion);
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
+ inlongStreamId,
AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
+ }
+ baos.reset();
+ overLen = false;
+ break;
+ case '\r':
+ break;
+ default:
+ if (baos.size() < maxPackSize) {
+ baos.write(ch);
+ } else {
+ overLen = true;
+ }
+ }
+ }
+ }
+ baos.close();
+ inputStream.close();
+ cosObject.close();
+ }
+
+ private String getOffsetString(Long lineOffset, Long byteOffset) {
+ return lineOffset + OFFSET_SEP + byteOffset;
+ }
+
+ private FileOffset parseFIleOffset(String offset) {
+ String[] offsetArray = offset.split(OFFSET_SEP);
+ if (offsetArray.length == LEN_OF_FILE_OFFSET_ARRAY) {
+ return new FileOffset(Long.parseLong(offsetArray[0]),
Long.parseLong(offsetArray[1]), true);
+ } else {
+ return new FileOffset(Long.parseLong(offsetArray[0]), null, false);
+ }
+ }
+
+ @Override
+ protected boolean isRunnable() {
+ return runnable && fileExist;
+ }
+
+ @Override
+ public boolean sourceExist() {
+ return fileExist;
+ }
+
+ @Override
+ protected void releaseSource() {
+ while (running) {
+ AgentUtils.silenceSleepInMs(1);
+ }
+ if (cosClient != null) {
+ FileStatic data = new FileStatic();
+ data.setTaskId(taskId);
+ data.setRetry(String.valueOf(profile.isRetry()));
+ data.setContentType(profile.get(COS_CONTENT_STYLE));
+ data.setGroupId(profile.getInlongGroupId());
+ data.setStreamId(profile.getInlongStreamId());
+ data.setDataTime(format.format(profile.getSinkDataTime()));
+ data.setFileName(profile.getInstanceId());
+ data.setFileLen(String.valueOf(metadata.getContentLength()));
+ data.setReadBytes(String.valueOf(bytePosition));
+ data.setReadLines(String.valueOf(linePosition));
+ OffsetProfile offsetProfile =
OffsetManager.getInstance().getOffset(taskId, instanceId);
+ if (offsetProfile != null) {
+ data.setSendLines(offsetProfile.getOffset());
+ }
+ FileStaticManager.putStaticMsg(data);
+ cosClient.shutdown();
+ }
+ while (!queue.isEmpty()) {
+
MemoryManager.getInstance().release(AGENT_GLOBAL_COS_SOURCE_PERMIT,
queue.poll().getData().length);
+ }
+ }
+
+ private boolean waitForPermit(String permitName, int permitLen) {
+ boolean suc = false;
+ while (!suc) {
+ suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
+ if (!suc) {
+ MemoryManager.getInstance().printDetail(permitName,
"cos_source");
+ if (!isRunnable()) {
+ return false;
+ }
+ AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
+ }
+ }
+ return true;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 300c16168f..ecaa53196b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -71,7 +71,7 @@ public class LogFileSource extends AbstractSource {
public static final String OFFSET_SEP = ":";
private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 64 * 1024;
private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
- private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"); // 设置格式
+ private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
private String fileName;
private File file;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 3ce1378fa7..72d14327a2 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -165,7 +165,7 @@ public abstract class AbstractSource implements Source {
break;
}
List<SourceData> lines = readFromSource();
- if (lines != null && lines.isEmpty()) {
+ if (lines == null || lines.isEmpty()) {
if (queue.isEmpty()) {
emptyCount++;
} else {
@@ -176,14 +176,12 @@ public abstract class AbstractSource implements Source {
continue;
}
emptyCount = 0;
- if (lines != null) {
- for (int i = 0; i < lines.size(); i++) {
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(lines.get(i));
+ for (int i = 0; i < lines.size(); i++) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+ if (!suc4Queue) {
+ break;
}
+ putIntoQueue(lines.get(i));
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
new file mode 100644
index 0000000000..eceb38e8be
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/COSTask.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.cos;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.plugin.task.AbstractTask;
+import org.apache.inlong.agent.plugin.task.cos.FileScanner.BasicFileInfo;
+import org.apache.inlong.agent.plugin.utils.cos.COSUtils;
+import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Watch directory, if new valid files are created, create instance
correspondingly.
+ */
+public class COSTask extends AbstractTask {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(COSTask.class);
+ public static final String DEFAULT_COS_INSTANCE =
"org.apache.inlong.agent.plugin.instance.COSInstance";
+ private static final int INSTANCE_QUEUE_CAPACITY = 10;
+ private final Map<String/* dataTime */, Map<String/* fileName */,
InstanceProfile>> eventMap =
+ new ConcurrentHashMap<>();
+ public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
+ private boolean retry;
+ private long startTime;
+ private long endTime;
+ private String originPattern;
+ private long lastScanTime = 0;
+ public final long SCAN_INTERVAL = 1 * 60 * 1000;
+ private volatile boolean runAtLeastOneTime = false;
+ private BlockingQueue<InstanceProfile> instanceQueue;
+ private COSClient cosClient;
+ private String bucketName;
+ private String secretId;
+ private String secretKey;
+ private String strRegion;
+ private String timeOffset = "";
+
+ @Override
+ protected int getInstanceLimit() {
+ return taskProfile.getInt(TaskConstants.COS_MAX_NUM);
+ }
+
+ @Override
+ protected void initTask() {
+ timeOffset = taskProfile.get(TaskConstants.TASK_COS_TIME_OFFSET, "");
+ instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);
+ retry = taskProfile.getBoolean(TaskConstants.COS_TASK_RETRY, false);
+ originPattern = taskProfile.get(TaskConstants.COS_TASK_PATTERN);
+ bucketName = taskProfile.get(TaskConstants.COS_TASK_BUCKET_NAME);
+ secretId = taskProfile.get(TaskConstants.COS_TASK_SECRET_ID);
+ secretKey = taskProfile.get(TaskConstants.COS_TASK_SECRET_KEY);
+ strRegion = taskProfile.get(TaskConstants.COS_TASK_REGION);
+ cosClient = COSUtils.createCli(secretId, secretKey, strRegion);
+ if (retry) {
+ initRetryTask(taskProfile);
+ }
+ }
+
+ private boolean initRetryTask(TaskProfile profile) {
+ String dataTimeFrom = profile.get(TaskConstants.COS_TASK_TIME_FROM,
"");
+ String dataTimeTo = profile.get(TaskConstants.COS_TASK_TIME_TO, "");
+ try {
+ startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom,
profile.getCycleUnit());
+ endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo,
profile.getCycleUnit());
+ } catch (ParseException e) {
+ LOGGER.error("retry task time error start {} end {}",
dataTimeFrom, dataTimeTo, e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected List<InstanceProfile> getNewInstanceList() {
+ if (retry) {
+ runForRetry();
+ } else {
+ runForNormal();
+ }
+ List<InstanceProfile> list = new ArrayList<>();
+ while (list.size() < INSTANCE_QUEUE_CAPACITY &&
!instanceQueue.isEmpty()) {
+ InstanceProfile profile = instanceQueue.poll();
+ if (profile != null) {
+ list.add(profile);
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.COS_TASK_CYCLE_UNIT)) {
+ LOGGER.error("task profile needs cos cycle unit");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
+ LOGGER.error("task profile needs cycle unit");
+ return false;
+ }
+ if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+ .compareTo(profile.get(TaskConstants.COS_TASK_CYCLE_UNIT)) !=
0) {
+ LOGGER.error("task profile cycle unit must be consistent");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_TIME_ZONE)) {
+ LOGGER.error("task profile needs time zone");
+ return false;
+ }
+ boolean ret = profile.hasKey(TaskConstants.COS_TASK_PATTERN)
+ && profile.hasKey(TaskConstants.COS_MAX_NUM);
+ if (!ret) {
+ LOGGER.error("task profile needs file keys");
+ return false;
+ }
+ if (!profile.hasKey(TaskConstants.TASK_COS_TIME_OFFSET)) {
+ LOGGER.error("task profile needs time offset");
+ return false;
+ }
+ if (profile.getBoolean(TaskConstants.COS_TASK_RETRY, false)) {
+ if (!initRetryTask(profile)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void releaseTask() {
+ cosClient.shutdown();
+ }
+
+ private void runForRetry() {
+ if (!runAtLeastOneTime) {
+ scanExistingFile();
+ runAtLeastOneTime = true;
+ }
+ dealWithEventMap();
+ if (allInstanceFinished()) {
+ LOGGER.info("retry task finished, send action to task manager,
taskId {}", getTaskId());
+ TaskAction action = new
TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile);
+ taskManager.submitAction(action);
+ doChangeState(State.SUCCEEDED);
+ }
+ }
+
+ private void runForNormal() {
+ if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) {
+ scanExistingFile();
+ lastScanTime = AgentUtils.getCurrentTime();
+ }
+ dealWithEventMap();
+ }
+
+ private void scanExistingFile() {
+ LOGGER.info("test123 qqqq");
+ List<BasicFileInfo> fileInfos =
FileScanner.scanTaskBetweenTimes(cosClient, bucketName, originPattern,
+ taskProfile.getCycleUnit(), timeOffset, startTime, endTime,
retry);
+ LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
+ fileInfos.forEach((fileInfo) -> {
+ addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
+ if (retry) {
+ instanceCount++;
+ }
+ });
+ }
+
+ private boolean isInEventMap(String fileName, String dataTime) {
+ Map<String, InstanceProfile> fileToProfile = eventMap.get(dataTime);
+ if (fileToProfile == null) {
+ return false;
+ }
+ return fileToProfile.get(fileName) != null;
+ }
+
+ private void dealWithEventMap() {
+ removeTimeoutEvent(eventMap, retry);
+ dealWithEventMapWithCycle();
+ }
+
+ private void dealWithEventMapWithCycle() {
+ long startScanTime = startTime;
+ long endScanTime = endTime;
+ List<String> dataTimeList = Scanner.getDataTimeList(startScanTime,
endScanTime, taskProfile.getCycleUnit(),
+ timeOffset, retry);
+ if (dataTimeList.isEmpty()) {
+ LOGGER.error("get dataTimeList return empty list");
+ return;
+ }
+ Set<String> dealtDataTime = new HashSet<>();
+ // normal task first handle current data time
+ if (!retry) {
+ String current = dataTimeList.remove(dataTimeList.size() - 1);
+ dealtDataTime.add(current);
+ if (!dealEventMapByDataTime(current, true)) {
+ return;
+ }
+ }
+ dataTimeList.forEach(dataTime -> {
+ dealtDataTime.add(dataTime);
+ if (!dealEventMapByDataTime(dataTime, false)) {
+ return;
+ }
+ });
+ for (String dataTime : eventMap.keySet()) {
+ if (!dealtDataTime.contains(dataTime)) {
+ dealEventMapByDataTime(dataTime, false);
+ }
+ }
+ }
+
+ private boolean dealEventMapByDataTime(String dataTime, boolean
isCurrentDataTime) {
+ Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.get(dataTime);
+ if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) {
+ return true;
+ }
+ if (shouldStartNow(dataTime)) {
+ Set<InstanceProfile> sortedEvents = new
TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId));
+ sortedEvents.addAll(sameDataTimeEvents.values());
+ for (InstanceProfile sortEvent : sortedEvents) {
+ String fileName = sortEvent.getInstanceId();
+ InstanceProfile profile = sameDataTimeEvents.get(fileName);
+ if (!isCurrentDataTime && isFull()) {
+ return false;
+ }
+ if (!instanceQueue.offer(profile)) {
+ return false;
+ }
+ sameDataTimeEvents.remove(fileName);
+ }
+ }
+ return true;
+ }
+
+ /*
+ * Calculate whether the event needs to be processed at the current time
based on its data time, business cycle, and
+ * offset
+ */
+ private boolean shouldStartNow(String dataTime) {
+ String shouldStartTime =
+ NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), timeOffset);
+ String currentTime = getCurrentTime();
+ return currentTime.compareTo(shouldStartTime) >= 0;
+ }
+
+ private void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>>
eventMap, boolean isRetry) {
+ if (isRetry) {
+ return;
+ }
+ for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
+ /* If the data time of the event is within 2 days before (after)
the current time, it is valid */
+ String dataTime = entry.getKey();
+ if (!NewDateUtils.isValidCreationTime(dataTime,
DAY_TIMEOUT_INTERVAL)) {
+ /* Remove it from memory map. */
+ eventMap.remove(dataTime);
+ LOGGER.warn("remove too old event from event map. dataTime
{}", dataTime);
+ }
+ }
+ }
+
+ private String getCurrentTime() {
+ SimpleDateFormat dateFormat = new
SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT);
+ TimeZone timeZone =
TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE);
+ dateFormat.setTimeZone(timeZone);
+ return dateFormat.format(new Date(System.currentTimeMillis()));
+ }
+
+ private void addToEvenMap(String fileName, String dataTime) {
+ if (isInEventMap(fileName, dataTime)) {
+ LOGGER.info("add to evenMap isInEventMap returns true skip taskId
{} dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
+ return;
+ }
+ LOGGER.info("test123 {}", cosClient);
+ ObjectMetadata meta = cosClient.getObjectMetadata(bucketName,
fileName);
+ Long fileUpdateTime = meta.getLastModified().getTime();
+ if (!shouldAddAgain(fileName, fileUpdateTime)) {
+ LOGGER.info("add to evenMap shouldAddAgain returns false skip
taskId {} dataTime {} fileName {}",
+ taskProfile.getTaskId(), dataTime, fileName);
+ return;
+ }
+ Map<String, InstanceProfile> sameDataTimeEvents =
eventMap.computeIfAbsent(dataTime,
+ mapKey -> new ConcurrentHashMap<>());
+ boolean containsInMemory = sameDataTimeEvents.containsKey(fileName);
+ if (containsInMemory) {
+ LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
+ return;
+ }
+ String cycleUnit = taskProfile.getCycleUnit();
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_COS_INSTANCE,
+ fileName, cycleUnit, dataTime, fileUpdateTime);
+ sameDataTimeEvents.put(fileName, instanceProfile);
+ LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
new file mode 100644
index 0000000000..4eac0eeff2
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/cos/FileScanner.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task.cos;
+
+import org.apache.inlong.agent.plugin.utils.regex.PatternUtil;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner;
+import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.exception.CosClientException;
+import com.qcloud.cos.exception.CosServiceException;
+import com.qcloud.cos.model.COSObjectSummary;
+import com.qcloud.cos.model.ListObjectsRequest;
+import com.qcloud.cos.model.ObjectListing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/*
+ * This class is mainly used for scanning log file that we want to read. We
use this class at
+ * inlong_agent recover process, the do and redo tasks and the current log
file access when we deploy a
+ * new data source.
+ */
+public class FileScanner {
+
+ public static final int DEFAULT_KEY_COUNT = 100;
+ public static final String DEFAULT_DELIMITER = "/";
+ public static final char PATH_SEP = '/';
+
+ public static class BasicFileInfo {
+
+ public String fileName;
+ public String dataTime;
+
+ public BasicFileInfo(String fileName, String dataTime) {
+ this.fileName = fileName;
+ this.dataTime = dataTime;
+ }
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(FileScanner.class);
+
+ public static List<BasicFileInfo> scanTaskBetweenTimes(COSClient
cosClient, String bucketName, String originPattern,
+ String cycleUnit, String timeOffset, long startTime, long endTime,
boolean isRetry) {
+ List<FinalPatternInfo> finalPatternInfos =
Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset,
+ startTime, endTime, isRetry);
+ List<BasicFileInfo> infos = new ArrayList<>();
+ for (FinalPatternInfo finalPatternInfo : finalPatternInfos) {
+ String prefix =
PatternUtil.getBeforeFirstWildcard(finalPatternInfo.finalPattern);
+ Pattern pattern = Pattern.compile(finalPatternInfo.finalPattern,
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL |
Pattern.MULTILINE);
+ List<BasicFileInfo> fileInfos = scanTaskInOneCycle(cosClient,
bucketName, pattern, prefix,
+ finalPatternInfo.dataTime, cycleUnit);
+ infos.addAll(fileInfos);
+ }
+ return infos;
+ }
+
+ public static List<BasicFileInfo> scanTaskInOneCycle(COSClient cosClient,
String bucketName, Pattern pattern,
+ String prefix, Long dataTime, String cycleUnit) {
+ List<BasicFileInfo> infos = new ArrayList<>();
+ ObjectListing objectListing;
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ do {
+ try {
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setPrefix(prefix);
+ listObjectsRequest.setDelimiter(DEFAULT_DELIMITER);
+ listObjectsRequest.setMaxKeys(DEFAULT_KEY_COUNT);
+ objectListing = cosClient.listObjects(listObjectsRequest);
+ } catch (CosServiceException e) {
+ logger.error("scanTaskInOneCycle finalPattern {}
CosServiceException", pattern.pattern(), e);
+ return infos;
+ } catch (CosClientException e) {
+ logger.error("scanTaskInOneCycle finalPattern {}
CosClientException", pattern.pattern(), e);
+ return infos;
+ }
+ List<String> commonPrefixes = objectListing.getCommonPrefixes();
+ int depth;
+ Pattern patternByDepth = null;
+ if (!commonPrefixes.isEmpty()) {
+ depth = countCharacterOccurrences(commonPrefixes.get(0),
PATH_SEP);
+ String temp = findNthOccurrenceSubstring(pattern.pattern(),
PATH_SEP, depth);
+ patternByDepth = Pattern.compile(temp,
Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ }
+ for (String commonPrefix : commonPrefixes) {
+ Matcher matcher = patternByDepth.matcher(commonPrefix);
+ if (matcher.matches()) {
+ infos.addAll(scanTaskInOneCycle(cosClient, bucketName,
pattern, commonPrefix, dataTime, cycleUnit));
+ }
+ }
+ List<COSObjectSummary> cosObjectSummaries =
objectListing.getObjectSummaries();
+ for (COSObjectSummary cosObjectSummary : cosObjectSummaries) {
+ String key = cosObjectSummary.getKey();
+ Matcher matcher = pattern.matcher(key);
+ if (matcher.lookingAt()) {
+ long fileSize = cosObjectSummary.getSize();
+ String storageClasses = cosObjectSummary.getStorageClass();
+ infos.add(new BasicFileInfo(key,
+ DateTransUtils.millSecConvertToTimeStr(dataTime,
cycleUnit)));
+ String strDataTime =
DateTransUtils.millSecConvertToTimeStr(dataTime, cycleUnit);
+ logger.info("list key {} dataTime {} size {},
storageClasses {}", key, strDataTime, fileSize,
+ storageClasses);
+ }
+ }
+ String nextMarker = objectListing.getNextMarker();
+ listObjectsRequest.setMarker(nextMarker);
+ } while (objectListing.isTruncated());
+ return infos;
+ }
+
+ public static int countCharacterOccurrences(String input, char targetChar)
{
+ if (input == null) {
+ throw new IllegalArgumentException("Input string cannot be null");
+ }
+ int count = 0;
+
+ for (int i = 0; i < input.length(); i++) {
+ if (input.charAt(i) == targetChar) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public static String findNthOccurrenceSubstring(String input, char
targetChar, int n) {
+ int endIndex = findNthOccurrence(input, targetChar, n);
+ if (endIndex != -1) {
+ return input.substring(0, endIndex + 1);
+ } else {
+ return null;
+ }
+ }
+
+ public static int findNthOccurrence(String input, char targetChar, int n) {
+ int currentIndex = -1;
+ for (int i = 0; i < n; i++) {
+ currentIndex = input.indexOf(targetChar, currentIndex + 1);
+ if (currentIndex == -1) {
+ return -1;
+ }
+ }
+ return currentIndex;
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
new file mode 100644
index 0000000000..259d330776
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSConfigHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.ClientConfig;
+
+// For some private, customized extension processing
+public abstract class COSConfigHandler {
+
+ abstract public ClientConfig getClientConfig(String region);
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
new file mode 100644
index 0000000000..111e94212f
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/COSUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.COSClient;
+import com.qcloud.cos.ClientConfig;
+import com.qcloud.cos.auth.BasicCOSCredentials;
+import com.qcloud.cos.auth.COSCredentials;
+
+public class COSUtils {
+
+ public static COSClient createCli(String secretId, String secretKey,
String region) {
+ COSCredentials cred = new BasicCOSCredentials(secretId, secretKey);
+ COSConfigHandler configHandler = new DefaultCOSConfigHandler();
+ ClientConfig clientConfig = configHandler.getClientConfig(region);
+ return new COSClient(cred, clientConfig);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
new file mode 100644
index 0000000000..a9dda478a2
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/cos/DefaultCOSConfigHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils.cos;
+
+import com.qcloud.cos.ClientConfig;
+import com.qcloud.cos.region.Region;
+
+public class DefaultCOSConfigHandler extends COSConfigHandler {
+
+ public ClientConfig getClientConfig(String region) {
+ return new ClientConfig(new Region(region));
+ }
+}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 33a51c916a..cd2f529072 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -541,6 +541,7 @@ MIT licenses
The following components are provided under MIT license. See project link for
details.
The text of each license is also included at licenses/LICENSE-[project].txt.
+ com.qcloud:cos_api:jar:5.6.54 - cos-java-sdk
(https://github.com/tencentyun/cos-java-sdk-v5), (MIT License)
net.sourceforge.argparse4j:argparse4j:0.7.0 - argparse4j
(https://github.com/argparse4j/argparse4j/tree/argparse4j-0.7.0), (MIT)
org.bouncycastle:bcpkix-jdk15on:1.69 - Bouncy Castle PKIX, CMS, EAC, TSP,
PKCS, OCSP, CMP, and CRMF APIs (https://www.bouncycastle.org/java.html), (MIT
License)
org.bouncycastle:bcprov-ext-jdk15on:1.69 - Bouncy Castle Provider
(https://www.bouncycastle.org/java.html), (MIT License)
diff --git a/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt
b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt
new file mode 100644
index 0000000000..4b32321f98
--- /dev/null
+++ b/licenses/inlong-agent/licenses/LICENSE-cos-java-sdk.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 腾讯云
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4330360033..0562af4dd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,7 @@
<HikariCP.version>4.0.3</HikariCP.version>
<caffeine.version>2.9.3</caffeine.version>
<kafka.clients.version>3.0.0</kafka.clients.version>
+ <cos.sdk.version>5.6.54</cos.sdk.version>
<paho.client.version>1.2.5</paho.client.version>
<kubernetes.client.version>6.0.0</kubernetes.client.version>