This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ba475c15c4 [INLONG-10115][Agent] Change the offset save format from
long to string (#10116)
ba475c15c4 is described below
commit ba475c15c4083f0a8d8731ddeb702c9844635a57
Author: justinwwhuang <[email protected]>
AuthorDate: Mon May 6 10:37:14 2024 +0800
[INLONG-10115][Agent] Change the offset save format from long to string
(#10116)
---
.../main/java/org/apache/inlong/agent/conf/OffsetProfile.java | 10 +++++-----
.../java/org/apache/inlong/agent/constant/TaskConstants.java | 2 +-
.../org/apache/inlong/agent/message/file/OffsetAckInfo.java | 2 +-
.../org/apache/inlong/agent/message/file/ProxyMessage.java | 3 +--
.../org/apache/inlong/agent/plugin/sources/KafkaSource.java | 2 +-
.../org/apache/inlong/agent/plugin/sources/LogFileSource.java | 4 ++--
.../org/apache/inlong/agent/plugin/sources/MongoDBSource.java | 2 +-
.../org/apache/inlong/agent/plugin/sources/PulsarSource.java | 4 +++-
.../inlong/agent/plugin/sources/file/AbstractSource.java | 4 ++--
.../agent/plugin/sinks/filecollect/TestSenderManager.java | 2 +-
.../apache/inlong/agent/plugin/sources/TestLogFileSource.java | 2 +-
11 files changed, 19 insertions(+), 18 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
index fb88145653..a2e0109758 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
@@ -42,7 +42,7 @@ public class OffsetProfile extends AbstractConfiguration {
public OffsetProfile() {
}
- public OffsetProfile(String taskId, String instanceId, long offset, String
inodeInfo) {
+ public OffsetProfile(String taskId, String instanceId, String offset,
String inodeInfo) {
setTaskId(taskId);
setInstanceId(instanceId);
setOffset(offset);
@@ -77,12 +77,12 @@ public class OffsetProfile extends AbstractConfiguration {
setLong(TaskConstants.LAST_UPDATE_TIME, lastUpdateTime);
}
- public Long getOffset() {
- return getLong(TaskConstants.OFFSET, TaskConstants.DEFAULT_OFFSET);
+ public String getOffset() {
+ return get(TaskConstants.OFFSET, TaskConstants.DEFAULT_OFFSET);
}
- public void setOffset(Long offset) {
- setLong(TaskConstants.OFFSET, offset);
+ public void setOffset(String offset) {
+ set(TaskConstants.OFFSET, offset);
}
public String getInodeInfo() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 34957fd65f..96669e1235 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -48,7 +48,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_MQ_CLUSTERS = "task.mqClusters";
public static final String TASK_MQ_TOPIC = "task.topicInfo";
public static final String OFFSET = "offset";
- public static final Long DEFAULT_OFFSET = -1L;
+ public static final String DEFAULT_OFFSET = "-1L";
public static final String INODE_INFO = "inodeInfo";
// File task
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
index 146958aa3a..fa0864e477 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
@@ -24,7 +24,7 @@ import lombok.Data;
@AllArgsConstructor
public class OffsetAckInfo {
- private Long offset;
+ private String offset;
private int len;
private Boolean hasAck;
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
index 6122ad59c4..12ebe4d0f3 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
@@ -50,8 +50,7 @@ public class ProxyMessage implements Message {
this.dataKey = header.getOrDefault(PROXY_KEY_DATA, "");
// use the batch key of user and inlongStreamId to determine one batch
this.batchKey = dataKey + inlongStreamId;
- Long offset = Long.parseLong(header.get(TaskConstants.OFFSET));
- ackInfo = new OffsetAckInfo(offset, body.length, false);
+ ackInfo = new OffsetAckInfo(header.get(TaskConstants.OFFSET),
body.length, false);
}
public ProxyMessage(Message message) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 0ca9b97def..a12233bc4b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -112,7 +112,7 @@ public class KafkaSource extends AbstractSource {
List<SourceData> dataList = new ArrayList<>();
ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, byte[]> record : records) {
- SourceData sourceData = new SourceData(record.value(),
record.offset());
+ SourceData sourceData = new SourceData(record.value(),
Long.toString(record.offset()));
dataList.add(sourceData);
offset = record.offset();
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index dd6d9ce146..a942854afe 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -130,7 +130,7 @@ public class LogFileSource extends AbstractSource {
bytePosition = readLines(randomAccessFile, pos, lines,
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
for (int i = 0; i < lines.size(); i++) {
linePosition++;
- dataList.add(new SourceData(lines.get(i), linePosition));
+ dataList.add(new SourceData(lines.get(i),
Long.toString(linePosition)));
}
return dataList;
}
@@ -148,7 +148,7 @@ public class LogFileSource extends AbstractSource {
private long getInitLineOffset(boolean isIncrement, String taskId, String
instanceId, String inodeInfo) {
long offset = 0;
if (offsetProfile != null &&
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
- offset = offsetProfile.getOffset();
+ offset = Long.parseLong(offsetProfile.getOffset());
int fileLineCount = getRealLineCount(instanceId);
if (fileLineCount < offset) {
LOGGER.info("getInitLineOffset inode no change taskId {} file
rotate, offset set to 0, file {}", taskId,
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index d9cf63ee0e..f3f8fb7f15 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -121,7 +121,7 @@ public class MongoDBSource extends AbstractSource {
RecordCommitter<ChangeEvent<String, String>> committer) throws
InterruptedException {
boolean offerSuc = false;
for (ChangeEvent<String, String> record : records) {
- SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L);
+ SourceData sourceData = new
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L");
while (isRunnable() && !offerSuc) {
offerSuc = debeziumQueue.offer(sourceData, 1,
TimeUnit.SECONDS);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 34a81f7d22..f7c63acba4 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -103,7 +104,8 @@ public class PulsarSource extends AbstractSource {
LOGGER.error("read from pulsar error", e);
}
if (!ObjectUtils.isEmpty(message)) {
- dataList.add(new SourceData(message.getValue(), 0L));
+ dataList.add(new SourceData(message.getValue(), new
String(message.getMessageId().toByteArray(),
+ StandardCharsets.UTF_8)));
}
try {
consumer.acknowledge(message);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 2449d065e4..bc5225206a 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -77,7 +77,7 @@ public abstract class AbstractSource implements Source {
protected class SourceData {
private byte[] data;
- private Long offset;
+ private String offset;
}
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSource.class);
@@ -323,7 +323,7 @@ public abstract class AbstractSource implements Source {
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
- header.put(OFFSET, sourceData.getOffset().toString());
+ header.put(OFFSET, sourceData.getOffset());
header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
if (extendedHandler != null) {
extendedHandler.dealWithHeader(header, sourceData.getData());
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 30945adfa9..897d996867 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -104,7 +104,7 @@ public class TestSenderManager {
List<OffsetAckInfo> ackInfoList = new ArrayList<>();
bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
for (int j = 0; j < bodyList.size(); j++) {
- OffsetAckInfo ackInfo = new OffsetAckInfo(offset++,
bodyList.get(j).length, false);
+ OffsetAckInfo ackInfo = new
OffsetAckInfo(Long.toString(offset++), bodyList.get(j).length, false);
ackInfoList.add(ackInfo);
ackInfoListTotal.add(ackInfo);
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1923dc5f1c..1cd3a2b927 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -92,7 +92,7 @@ public class TestLogFileSource {
if (offset > 0) {
OffsetProfile offsetProfile = new
OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
- offset, instanceProfile.get(INODE_INFO));
+ Long.toString(offset),
instanceProfile.get(INODE_INFO));
OffsetManager.getInstance().setOffset(offsetProfile);
}
source.init(instanceProfile);