This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ba475c15c4 [INLONG-10115][Agent] Change the offset save format from 
long to string (#10116)
ba475c15c4 is described below

commit ba475c15c4083f0a8d8731ddeb702c9844635a57
Author: justinwwhuang <[email protected]>
AuthorDate: Mon May 6 10:37:14 2024 +0800

    [INLONG-10115][Agent] Change the offset save format from long to string 
(#10116)
---
 .../main/java/org/apache/inlong/agent/conf/OffsetProfile.java  | 10 +++++-----
 .../java/org/apache/inlong/agent/constant/TaskConstants.java   |  2 +-
 .../org/apache/inlong/agent/message/file/OffsetAckInfo.java    |  2 +-
 .../org/apache/inlong/agent/message/file/ProxyMessage.java     |  3 +--
 .../org/apache/inlong/agent/plugin/sources/KafkaSource.java    |  2 +-
 .../org/apache/inlong/agent/plugin/sources/LogFileSource.java  |  4 ++--
 .../org/apache/inlong/agent/plugin/sources/MongoDBSource.java  |  2 +-
 .../org/apache/inlong/agent/plugin/sources/PulsarSource.java   |  4 +++-
 .../inlong/agent/plugin/sources/file/AbstractSource.java       |  4 ++--
 .../agent/plugin/sinks/filecollect/TestSenderManager.java      |  2 +-
 .../apache/inlong/agent/plugin/sources/TestLogFileSource.java  |  2 +-
 11 files changed, 19 insertions(+), 18 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
index fb88145653..a2e0109758 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/OffsetProfile.java
@@ -42,7 +42,7 @@ public class OffsetProfile extends AbstractConfiguration {
     public OffsetProfile() {
     }
 
-    public OffsetProfile(String taskId, String instanceId, long offset, String 
inodeInfo) {
+    public OffsetProfile(String taskId, String instanceId, String offset, 
String inodeInfo) {
         setTaskId(taskId);
         setInstanceId(instanceId);
         setOffset(offset);
@@ -77,12 +77,12 @@ public class OffsetProfile extends AbstractConfiguration {
         setLong(TaskConstants.LAST_UPDATE_TIME, lastUpdateTime);
     }
 
-    public Long getOffset() {
-        return getLong(TaskConstants.OFFSET, TaskConstants.DEFAULT_OFFSET);
+    public String getOffset() {
+        return get(TaskConstants.OFFSET, TaskConstants.DEFAULT_OFFSET);
     }
 
-    public void setOffset(Long offset) {
-        setLong(TaskConstants.OFFSET, offset);
+    public void setOffset(String offset) {
+        set(TaskConstants.OFFSET, offset);
     }
 
     public String getInodeInfo() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 34957fd65f..96669e1235 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -48,7 +48,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_MQ_CLUSTERS = "task.mqClusters";
     public static final String TASK_MQ_TOPIC = "task.topicInfo";
     public static final String OFFSET = "offset";
-    public static final Long DEFAULT_OFFSET = -1L;
+    public static final String DEFAULT_OFFSET = "-1L";
     public static final String INODE_INFO = "inodeInfo";
 
     // File task
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
index 146958aa3a..fa0864e477 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/OffsetAckInfo.java
@@ -24,7 +24,7 @@ import lombok.Data;
 @AllArgsConstructor
 public class OffsetAckInfo {
 
-    private Long offset;
+    private String offset;
     private int len;
     private Boolean hasAck;
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
index 6122ad59c4..12ebe4d0f3 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessage.java
@@ -50,8 +50,7 @@ public class ProxyMessage implements Message {
         this.dataKey = header.getOrDefault(PROXY_KEY_DATA, "");
         // use the batch key of user and inlongStreamId to determine one batch
         this.batchKey = dataKey + inlongStreamId;
-        Long offset = Long.parseLong(header.get(TaskConstants.OFFSET));
-        ackInfo = new OffsetAckInfo(offset, body.length, false);
+        ackInfo = new OffsetAckInfo(header.get(TaskConstants.OFFSET), 
body.length, false);
     }
 
     public ProxyMessage(Message message) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 0ca9b97def..a12233bc4b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -112,7 +112,7 @@ public class KafkaSource extends AbstractSource {
         List<SourceData> dataList = new ArrayList<>();
         ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
         for (ConsumerRecord<String, byte[]> record : records) {
-            SourceData sourceData = new SourceData(record.value(), 
record.offset());
+            SourceData sourceData = new SourceData(record.value(), 
Long.toString(record.offset()));
             dataList.add(sourceData);
             offset = record.offset();
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index dd6d9ce146..a942854afe 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -130,7 +130,7 @@ public class LogFileSource extends AbstractSource {
         bytePosition = readLines(randomAccessFile, pos, lines, 
BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false);
         for (int i = 0; i < lines.size(); i++) {
             linePosition++;
-            dataList.add(new SourceData(lines.get(i), linePosition));
+            dataList.add(new SourceData(lines.get(i), 
Long.toString(linePosition)));
         }
         return dataList;
     }
@@ -148,7 +148,7 @@ public class LogFileSource extends AbstractSource {
     private long getInitLineOffset(boolean isIncrement, String taskId, String 
instanceId, String inodeInfo) {
         long offset = 0;
         if (offsetProfile != null && 
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
-            offset = offsetProfile.getOffset();
+            offset = Long.parseLong(offsetProfile.getOffset());
             int fileLineCount = getRealLineCount(instanceId);
             if (fileLineCount < offset) {
                 LOGGER.info("getInitLineOffset inode no change taskId {} file 
rotate, offset set to 0, file {}", taskId,
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
index d9cf63ee0e..f3f8fb7f15 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java
@@ -121,7 +121,7 @@ public class MongoDBSource extends AbstractSource {
             RecordCommitter<ChangeEvent<String, String>> committer) throws 
InterruptedException {
         boolean offerSuc = false;
         for (ChangeEvent<String, String> record : records) {
-            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), 0L);
+            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L");
             while (isRunnable() && !offerSuc) {
                 offerSuc = debeziumQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
             }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 34a81f7d22..f7c63acba4 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -103,7 +104,8 @@ public class PulsarSource extends AbstractSource {
             LOGGER.error("read from pulsar error", e);
         }
         if (!ObjectUtils.isEmpty(message)) {
-            dataList.add(new SourceData(message.getValue(), 0L));
+            dataList.add(new SourceData(message.getValue(), new 
String(message.getMessageId().toByteArray(),
+                    StandardCharsets.UTF_8)));
         }
         try {
             consumer.acknowledge(message);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 2449d065e4..bc5225206a 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -77,7 +77,7 @@ public abstract class AbstractSource implements Source {
     protected class SourceData {
 
         private byte[] data;
-        private Long offset;
+        private String offset;
     }
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSource.class);
@@ -323,7 +323,7 @@ public abstract class AbstractSource implements Source {
         String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
-        header.put(OFFSET, sourceData.getOffset().toString());
+        header.put(OFFSET, sourceData.getOffset());
         header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
         if (extendedHandler != null) {
             extendedHandler.dealWithHeader(header, sourceData.getData());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 30945adfa9..897d996867 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -104,7 +104,7 @@ public class TestSenderManager {
                 List<OffsetAckInfo> ackInfoList = new ArrayList<>();
                 bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
                 for (int j = 0; j < bodyList.size(); j++) {
-                    OffsetAckInfo ackInfo = new OffsetAckInfo(offset++, 
bodyList.get(j).length, false);
+                    OffsetAckInfo ackInfo = new 
OffsetAckInfo(Long.toString(offset++), bodyList.get(j).length, false);
                     ackInfoList.add(ackInfo);
                     ackInfoListTotal.add(ackInfo);
                 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1923dc5f1c..1cd3a2b927 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -92,7 +92,7 @@ public class TestLogFileSource {
             if (offset > 0) {
                 OffsetProfile offsetProfile = new 
OffsetProfile(instanceProfile.getTaskId(),
                         instanceProfile.getInstanceId(),
-                        offset, instanceProfile.get(INODE_INFO));
+                        Long.toString(offset), 
instanceProfile.get(INODE_INFO));
                 OffsetManager.getInstance().setOffset(offsetProfile);
             }
             source.init(instanceProfile);

Reply via email to