This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea25400f [INLONG-6960][Agent] Fix repeat uuid generation for 
different messages (#6967)
2ea25400f is described below

commit 2ea25400f881a42b343cbe48aaa5debfbccc17aa
Author: xueyingzhang <[email protected]>
AuthorDate: Wed Dec 21 14:57:28 2022 +0800

    [INLONG-6960][Agent] Fix repeat uuid generation for different messages 
(#6967)
---
 .../inlong/agent/plugin/message/SequentialID.java  | 78 +++++++++-----------
 .../inlong/agent/plugin/sinks/SenderManager.java   |  3 +-
 .../agent/plugin/message/SequentialIDTest.java     | 33 +++++++++
 .../org/apache/inlong/common/util/SnowFlake.java   | 84 ++++++++++++++++++++++
 4 files changed, 153 insertions(+), 45 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
index 879e42e57..a551530ee 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
@@ -18,10 +18,14 @@
 package org.apache.inlong.agent.plugin.message;
 
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.util.SnowFlake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
+import java.net.UnknownHostException;
+
+import static org.apache.inlong.common.util.SnowFlake.MAX_MACHINE_NUM;
 
 /**
  * Generate uniq sequential id, reset base id if max number
@@ -29,32 +33,35 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class SequentialID {
 
-    private static final int MAX_ID = 2000000000;
-    private static final String IP_HEX = getHex();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SequentialID.class);
+
     private static SequentialID uniqueSequentialID = null;
-    private final AtomicInteger id = new AtomicInteger(1);
-    private final ReentrantLock idLock = new ReentrantLock();
-    private final AtomicInteger uid = new AtomicInteger(1);
-    private final ReentrantLock uidLock = new ReentrantLock();
+    private SnowFlake snowFlake;
 
     private SequentialID() {
-
+        long machineId = ipStr2Int(AgentUtils.getLocalIp());
+        snowFlake = new SnowFlake(machineId);
     }
 
-    private static String getHex() {
-        String localIp = AgentUtils.getLocalIp();
+    private long ipStr2Int(String ip) {
+        long result = 0;
+        InetAddress ipv;
         try {
-            InetAddress ia = InetAddress.getByName(localIp);
-            byte[] hostBytes = ia.getAddress();
-            StringBuilder sb = new StringBuilder();
-            for (byte hostByte : hostBytes) {
-                sb.append(String.format("%02x", hostByte));
-            }
-            return sb.toString();
-        } catch (Exception e) {
-            // ignore it
+            ipv = InetAddress.getByName(ip);
+        } catch (UnknownHostException e) {
+            LOGGER.error("convert ip to int error", e);
+            return AgentUtils.getRandomBySeed(10);
         }
-        return "00000000";
+        for (byte b : ipv.getAddress()) {
+            result = result << 8 | (b & 0xFF);
+        }
+        if (result < 0) {
+            result = AgentUtils.getRandomBySeed(10);
+        }
+        if (result > MAX_MACHINE_NUM) {
+            result %= MAX_MACHINE_NUM;
+        }
+        return result;
     }
 
     /**
@@ -63,36 +70,19 @@ public class SequentialID {
     public static synchronized SequentialID getInstance() {
 
         if (uniqueSequentialID == null) {
-            uniqueSequentialID = new SequentialID();
-        }
-        return uniqueSequentialID;
-    }
-
-    public int getNextId() {
-        idLock.lock();
-        try {
-            if (id.get() > MAX_ID) {
-                id.set(1);
+            synchronized (SequentialID.class) {
+                if (uniqueSequentialID == null) {
+                    uniqueSequentialID = new SequentialID();
+                }
             }
-            return id.incrementAndGet();
-        } finally {
-            idLock.unlock();
         }
+        return uniqueSequentialID;
     }
 
     /**
      * get next uuid
      */
     public String getNextUuid() {
-        uidLock.lock();
-        try {
-            if (uid.get() > MAX_ID) {
-                uid.set(1);
-            }
-            return IP_HEX + "-" + String.format("%014x-%08x",
-                    System.currentTimeMillis(), uid.incrementAndGet());
-        } finally {
-            uidLock.unlock();
-        }
+        return String.valueOf(snowFlake.nextId());
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index a58ac78e1..db475be35 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -278,7 +278,8 @@ public class SenderManager {
 
         try {
             SendResult result = 
selectSender(groupId).sendMessage(batchMessage.getDataList(), groupId, streamId,
-                    dataTime, "", maxSenderTimeout, TimeUnit.SECONDS, 
batchMessage.getExtraMap(), proxySend);
+                    dataTime, SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, 
TimeUnit.SECONDS,
+                    batchMessage.getExtraMap(), proxySend);
             metricItem.pluginSendCount.addAndGet(msgCnt);
 
             if (result == SendResult.OK) {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
new file mode 100644
index 000000000..a4bbb4365
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.message;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+public class SequentialIDTest {
+
+    @Test
+    public void testUUID() {
+        String uid1 = SequentialID.getInstance().getNextUuid();
+        String uid2 = SequentialID.getInstance().getNextUuid();
+        assertNotEquals(uid1, uid2);
+    }
+
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java 
b/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java
new file mode 100644
index 000000000..fcfc23911
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.util;
+
+public class SnowFlake {
+
+    // start timestamp
+    private static final long START_STMP = 1480166465631L;
+
+    private static final long SEQUENCE_BIT = 9;
+    private static final long MACHINE_BIT = 13;
+
+    public static final long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
+    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
+
+    private static final long MACHINE_LEFT = SEQUENCE_BIT;
+    private static final long TIMESTMP_LEFT = SEQUENCE_BIT + MACHINE_BIT;
+
+    private long machineId;
+    private long sequence = 0L;
+    private long lastStmp = -1L;
+
+    public SnowFlake(long machineId) {
+        if (machineId > MAX_MACHINE_NUM || machineId < 0) {
+            throw new IllegalArgumentException(
+                    machineId + "machineId can't be greater than 
MAX_MACHINE_NUM or less than 0 MAX_MACHINE_NUM"
+                            + MAX_MACHINE_NUM);
+        }
+        this.machineId = machineId;
+    }
+
+    /**
+     * generate nextId
+     */
+    public synchronized long nextId() {
+        long currStmp = getNewstmp();
+        if (currStmp < lastStmp) {
+            throw new RuntimeException("Clock moved backwards.  Refusing to 
generate id");
+        }
+
+        if (currStmp == lastStmp) {
+            sequence = (sequence + 1) & MAX_SEQUENCE;
+            if (sequence == 0L) {
+                currStmp = getNextMill();
+            }
+        } else {
+            sequence = 0L;
+        }
+
+        lastStmp = currStmp;
+
+        return (currStmp - START_STMP) << TIMESTMP_LEFT
+                | machineId << MACHINE_LEFT
+                | sequence;
+    }
+
+    private long getNextMill() {
+        long mill = getNewstmp();
+        while (mill <= lastStmp) {
+            mill = getNewstmp();
+        }
+        return mill;
+    }
+
+    private long getNewstmp() {
+        return System.currentTimeMillis();
+    }
+
+}

Reply via email to