This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2ea25400f [INLONG-6960][Agent] Fix repeat uuid generation for
different messages (#6967)
2ea25400f is described below
commit 2ea25400f881a42b343cbe48aaa5debfbccc17aa
Author: xueyingzhang <[email protected]>
AuthorDate: Wed Dec 21 14:57:28 2022 +0800
[INLONG-6960][Agent] Fix repeat uuid generation for different messages
(#6967)
---
.../inlong/agent/plugin/message/SequentialID.java | 78 +++++++++-----------
.../inlong/agent/plugin/sinks/SenderManager.java | 3 +-
.../agent/plugin/message/SequentialIDTest.java | 33 +++++++++
.../org/apache/inlong/common/util/SnowFlake.java | 84 ++++++++++++++++++++++
4 files changed, 153 insertions(+), 45 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
index 879e42e57..a551530ee 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/SequentialID.java
@@ -18,10 +18,14 @@
package org.apache.inlong.agent.plugin.message;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.util.SnowFlake;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.InetAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
+import java.net.UnknownHostException;
+
+import static org.apache.inlong.common.util.SnowFlake.MAX_MACHINE_NUM;
/**
* Generate uniq sequential id, reset base id if max number
@@ -29,32 +33,35 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class SequentialID {
- private static final int MAX_ID = 2000000000;
- private static final String IP_HEX = getHex();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SequentialID.class);
+
private static SequentialID uniqueSequentialID = null;
- private final AtomicInteger id = new AtomicInteger(1);
- private final ReentrantLock idLock = new ReentrantLock();
- private final AtomicInteger uid = new AtomicInteger(1);
- private final ReentrantLock uidLock = new ReentrantLock();
+ private SnowFlake snowFlake;
private SequentialID() {
-
+ long machineId = ipStr2Int(AgentUtils.getLocalIp());
+ snowFlake = new SnowFlake(machineId);
}
- private static String getHex() {
- String localIp = AgentUtils.getLocalIp();
+ private long ipStr2Int(String ip) {
+ long result = 0;
+ InetAddress ipv;
try {
- InetAddress ia = InetAddress.getByName(localIp);
- byte[] hostBytes = ia.getAddress();
- StringBuilder sb = new StringBuilder();
- for (byte hostByte : hostBytes) {
- sb.append(String.format("%02x", hostByte));
- }
- return sb.toString();
- } catch (Exception e) {
- // ignore it
+ ipv = InetAddress.getByName(ip);
+ } catch (UnknownHostException e) {
+ LOGGER.error("convert ip to int error", e);
+ return AgentUtils.getRandomBySeed(10);
}
- return "00000000";
+ for (byte b : ipv.getAddress()) {
+ result = result << 8 | (b & 0xFF);
+ }
+ if (result < 0) {
+ result = AgentUtils.getRandomBySeed(10);
+ }
+ if (result > MAX_MACHINE_NUM) {
+ result %= MAX_MACHINE_NUM;
+ }
+ return result;
}
/**
@@ -63,36 +70,19 @@ public class SequentialID {
public static synchronized SequentialID getInstance() {
if (uniqueSequentialID == null) {
- uniqueSequentialID = new SequentialID();
- }
- return uniqueSequentialID;
- }
-
- public int getNextId() {
- idLock.lock();
- try {
- if (id.get() > MAX_ID) {
- id.set(1);
+ synchronized (SequentialID.class) {
+ if (uniqueSequentialID == null) {
+ uniqueSequentialID = new SequentialID();
+ }
}
- return id.incrementAndGet();
- } finally {
- idLock.unlock();
}
+ return uniqueSequentialID;
}
/**
* get next uuid
*/
public String getNextUuid() {
- uidLock.lock();
- try {
- if (uid.get() > MAX_ID) {
- uid.set(1);
- }
- return IP_HEX + "-" + String.format("%014x-%08x",
- System.currentTimeMillis(), uid.incrementAndGet());
- } finally {
- uidLock.unlock();
- }
+ return String.valueOf(snowFlake.nextId());
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index a58ac78e1..db475be35 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -278,7 +278,8 @@ public class SenderManager {
try {
SendResult result =
selectSender(groupId).sendMessage(batchMessage.getDataList(), groupId, streamId,
- dataTime, "", maxSenderTimeout, TimeUnit.SECONDS,
batchMessage.getExtraMap(), proxySend);
+ dataTime, SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout,
TimeUnit.SECONDS,
+ batchMessage.getExtraMap(), proxySend);
metricItem.pluginSendCount.addAndGet(msgCnt);
if (result == SendResult.OK) {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
new file mode 100644
index 000000000..a4bbb4365
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/message/SequentialIDTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.message;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+public class SequentialIDTest {
+
+ @Test
+ public void testUUID() {
+ String uid1 = SequentialID.getInstance().getNextUuid();
+ String uid2 = SequentialID.getInstance().getNextUuid();
+ assertNotEquals(uid1, uid2);
+ }
+
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java
b/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java
new file mode 100644
index 000000000..fcfc23911
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/util/SnowFlake.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.util;
+
+public class SnowFlake {
+
+ // start timestamp
+ private static final long START_STMP = 1480166465631L;
+
+ private static final long SEQUENCE_BIT = 9;
+ private static final long MACHINE_BIT = 13;
+
+ public static final long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
+ private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
+
+ private static final long MACHINE_LEFT = SEQUENCE_BIT;
+ private static final long TIMESTMP_LEFT = SEQUENCE_BIT + MACHINE_BIT;
+
+ private long machineId;
+ private long sequence = 0L;
+ private long lastStmp = -1L;
+
+ public SnowFlake(long machineId) {
+ if (machineId > MAX_MACHINE_NUM || machineId < 0) {
+ throw new IllegalArgumentException(
+ machineId + "machineId can't be greater than
MAX_MACHINE_NUM or less than 0 MAX_MACHINE_NUM"
+ + MAX_MACHINE_NUM);
+ }
+ this.machineId = machineId;
+ }
+
+ /**
+ * generate nextId
+ */
+ public synchronized long nextId() {
+ long currStmp = getNewstmp();
+ if (currStmp < lastStmp) {
+ throw new RuntimeException("Clock moved backwards. Refusing to
generate id");
+ }
+
+ if (currStmp == lastStmp) {
+ sequence = (sequence + 1) & MAX_SEQUENCE;
+ if (sequence == 0L) {
+ currStmp = getNextMill();
+ }
+ } else {
+ sequence = 0L;
+ }
+
+ lastStmp = currStmp;
+
+ return (currStmp - START_STMP) << TIMESTMP_LEFT
+ | machineId << MACHINE_LEFT
+ | sequence;
+ }
+
+ private long getNextMill() {
+ long mill = getNewstmp();
+ while (mill <= lastStmp) {
+ mill = getNewstmp();
+ }
+ return mill;
+ }
+
+ private long getNewstmp() {
+ return System.currentTimeMillis();
+ }
+
+}