This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d31c887dfa [INLONG-11605][Audit] Added audit data legitimacy 
verification (#11610)
d31c887dfa is described below

commit d31c887dfa9e6733e0dcc5f564e7c08d64588114
Author: doleyzi <[email protected]>
AuthorDate: Wed Dec 18 15:10:17 2024 +0800

    [INLONG-11605][Audit] Added audit data legitimacy verification (#11610)
---
 .../apache/inlong/audit/protocol/AuditData.java    | 114 +--------------------
 .../org/apache/inlong/audit/utils/DataUtils.java   |  48 +++++++++
 .../apache/inlong/audit/utils/DataUtilsTest.java   | 101 ++++++++++++++++++
 .../inlong/audit/source/ServerMessageHandler.java  |   7 +-
 .../inlong/audit/store/config/JdbcConfig.java      |   2 +
 .../inlong/audit/store/metric/MetricDimension.java |   3 +-
 .../inlong/audit/store/metric/MetricItem.java      |   2 +
 .../inlong/audit/store/metric/MetricsManager.java  |   3 +
 .../metric/prometheus/StorePrometheusMetric.java   |   3 +-
 .../inlong/audit/store/service/JdbcService.java    |  33 ++++++
 .../audit/store/service/consume/PulsarConsume.java |  26 ++---
 .../MetricItem.java => utils/PulsarUtils.java}     |  34 +++---
 12 files changed, 221 insertions(+), 155 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
index 85bde1d2d5..1efa200693 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.audit.protocol;
 
+import lombok.Data;
+
+@Data
 public class AuditData {
 
     private String ip;
@@ -33,115 +36,4 @@ public class AuditData {
     private long size;
     private long delay;
     private long auditVersion;
-
-    public long getAuditVersion() {
-        return auditVersion;
-    }
-
-    public void setAuditVersion(long auditVersion) {
-        this.auditVersion = auditVersion;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public void setIp(String ip) {
-        this.ip = ip;
-    }
-
-    public String getDockerId() {
-        return dockerId;
-    }
-
-    public void setDockerId(String dockerId) {
-        this.dockerId = dockerId;
-    }
-
-    public String getThreadId() {
-        return threadId;
-    }
-
-    public void setThreadId(String threadId) {
-        this.threadId = threadId;
-    }
-
-    public long getSdkTs() {
-        return sdkTs;
-    }
-
-    public void setSdkTs(long sdkTs) {
-        this.sdkTs = sdkTs;
-    }
-
-    public long getPacketId() {
-        return packetId;
-    }
-
-    public void setPacketId(long packetId) {
-        this.packetId = packetId;
-    }
-
-    public long getLogTs() {
-        return logTs;
-    }
-
-    public void setLogTs(long logTs) {
-        this.logTs = logTs;
-    }
-
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    public String getInlongStreamId() {
-        return inlongStreamId;
-    }
-
-    public void setInlongStreamId(String inlongStreamId) {
-        this.inlongStreamId = inlongStreamId;
-    }
-
-    public String getAuditId() {
-        return auditId;
-    }
-
-    public void setAuditId(String auditId) {
-        this.auditId = auditId;
-    }
-
-    public String getAuditTag() {
-        return auditTag;
-    }
-
-    public void setAuditTag(String auditTag) {
-        this.auditTag = auditTag;
-    }
-    public long getCount() {
-        return count;
-    }
-
-    public void setCount(long count) {
-        this.count = count;
-    }
-
-    public long getSize() {
-        return size;
-    }
-
-    public void setSize(long size) {
-        this.size = size;
-    }
-
-    public long getDelay() {
-        return delay;
-    }
-
-    public void setDelay(long delay) {
-        this.delay = delay;
-    }
 }
diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
new file mode 100644
index 0000000000..76f6ee9510
--- /dev/null
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class DataUtils {
+
+    private static final int MAX_AUDIT_ITEM_LENGTH = 256;
+
+    /**
+     * Checks if the timestamp is within the specified deviation range.
+     *
+     * @param dataTime  The timestamp to check.
+     * @param deviation The allowed time deviation.
+     * @return true if the timestamp is within the deviation range, false 
otherwise.
+     */
+    public static boolean isDataTimeValid(long dataTime, long deviation) {
+        long currentTime = System.currentTimeMillis();
+        long timeDiff = Math.abs(currentTime - dataTime);
+        return timeDiff <= deviation;
+    }
+
+    /**
+     * Checks if the audit item is valid.
+     *
+     * @param auditItem The audit item to check.
+     * @return true if the audit item is blank or its length is less than the 
maximum length, false otherwise.
+     */
+    public static boolean isAuditItemValid(String auditItem) {
+        return StringUtils.isBlank(auditItem) || auditItem.length() < 
MAX_AUDIT_ITEM_LENGTH;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
 
b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
new file mode 100644
index 0000000000..6b31bac1a3
--- /dev/null
+++ 
b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DataUtilsTest {
+
+    @Test
+    public void testIsDataTimeValid() {
+        long deviation = 604800000;
+        long dataTime = System.currentTimeMillis();
+        boolean valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertTrue(valid);
+
+        dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 2;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertTrue(valid);
+
+        dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 2;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertTrue(valid);
+
+        dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 8;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertFalse(valid);
+
+        dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 8;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertFalse(valid);
+
+        dataTime = 1734356619540000L;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertFalse(valid);
+
+        dataTime = 1L;
+        valid = DataUtils.isDataTimeValid(dataTime, deviation);
+        assertFalse(valid);
+    }
+
+    @Test
+    public void testIsAuditItemValid() {
+        String auditItem = null;
+        boolean valid = DataUtils.isAuditItemValid(auditItem);
+        assertTrue(valid);
+
+        auditItem = "";
+        valid = DataUtils.isAuditItemValid(auditItem);
+        assertTrue(valid);
+
+        auditItem = "1@dff";
+        valid = DataUtils.isAuditItemValid(auditItem);
+        assertTrue(valid);
+
+        auditItem = "fb320c7e51";
+        valid = DataUtils.isAuditItemValid(auditItem);
+        assertTrue(valid);
+
+        auditItem = "127.0.0.1";
+        valid = DataUtils.isAuditItemValid(auditItem);
+        assertTrue(valid);
+
+        Random random = new Random();
+        StringBuilder stringBuilder128 = new StringBuilder(128);
+        for (int i = 0; i < 128; i++) {
+            char c = (char) (random.nextInt(26) + 'a');
+            stringBuilder128.append(c);
+        }
+        valid = DataUtils.isAuditItemValid(stringBuilder128.toString());
+        assertTrue(valid);
+
+        StringBuilder stringBuilder256 = new StringBuilder(256);
+        for (int i = 0; i < 256; i++) {
+            char c = (char) (random.nextInt(26) + 'a');
+            stringBuilder256.append(c);
+        }
+        valid = DataUtils.isAuditItemValid(stringBuilder256.toString());
+        assertFalse(valid);
+
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 7595586dd4..d792610002 100644
--- 
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++ 
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -161,12 +161,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
         LOGGER.debug("Receive message count: {}", 
auditRequest.getMsgBodyCount());
         for (AuditMessageBody auditMessageBody : bodyList) {
             long msgDays = messageDays(auditMessageBody.getLogTs());
-            if (msgDays >= this.msgValidThresholdDays) {
-                LOGGER.debug("Discard the data as it is from {} days ago, only 
the data with a log timestamp"
-                        + " less than {} days is valid", msgDays, 
this.msgValidThresholdDays);
-
+            if (Math.abs(msgDays) >= this.msgValidThresholdDays) {
+                LOGGER.debug("Discard the invalid audit data: {}", 
auditMessageBody);
                 MetricsManager.getInstance().addReceiveCountExpired(1);
-
                 continue;
             }
             AuditData auditData = new AuditData();
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
index 0b6c7c36bb..7b059817d4 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
@@ -41,4 +41,6 @@ public class JdbcConfig {
     private int processIntervalMs;
     @Value("${audit.store.data.queue.size:1000000}")
     private int dataQueueSize;
+    @Value("${audit.store.valid.datatime.range.ms:604800000}")
+    private long validDataTimeRangeMs;
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
index 02c2258dbc..baa4bf21bb 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
@@ -23,7 +23,8 @@ public enum MetricDimension {
     RECEIVE_FAILED("receiveFailed"),
     SEND_COUNT_SUCCESS("sendCountSuccess"),
     SEND_COUNT_FAILED("sendCountFailed"),
-    SEND_DURATION("sendDuration");
+    SEND_DURATION("sendDuration"),
+    INVALID_DATA("invalidData");
 
     private final String key;
 
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
index 0e5dd9ad18..848f79b5db 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
@@ -30,11 +30,13 @@ public class MetricItem {
     private AtomicLong sendCountSuccess = new AtomicLong(0);
     private AtomicLong sendCountFailed = new AtomicLong(0);
     private AtomicLong sendDuration = new AtomicLong(0);
+    private AtomicLong invalidData = new AtomicLong(0);
     public void resetAllMetrics() {
         receiveCountSuccess.set(0);
         receiveFailed.set(0);
         sendCountSuccess.set(0);
         sendCountFailed.set(0);
         sendDuration.set(0);
+        invalidData.set(0);
     }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
index 68b69609cf..305c05df35 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
@@ -87,6 +87,9 @@ public class MetricsManager {
         metricItem.getSendCountFailed().addAndGet(count);
         metricItem.getSendDuration().addAndGet(duration);
     }
+    public void addInvalidData() {
+        metricItem.getInvalidData().addAndGet(1);
+    }
 
     public void shutdown() {
         timer.shutdown();
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
index bb72b7a178..8f1ad2d2c1 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
@@ -64,7 +64,8 @@ public class StorePrometheusMetric extends Collector 
implements AbstractMetric {
                 createSample(MetricDimension.RECEIVE_FAILED, 
metricItem.getReceiveFailed().doubleValue()),
                 createSample(MetricDimension.SEND_COUNT_SUCCESS, 
metricItem.getSendCountSuccess().doubleValue()),
                 createSample(MetricDimension.SEND_COUNT_FAILED, 
metricItem.getSendCountFailed().doubleValue()),
-                createSample(MetricDimension.SEND_DURATION, 
metricItem.getSendDuration().doubleValue()));
+                createSample(MetricDimension.SEND_DURATION, 
metricItem.getSendDuration().doubleValue()),
+                createSample(MetricDimension.INVALID_DATA, 
metricItem.getInvalidData().doubleValue()));
 
         MetricFamilySamples metricFamilySamples =
                 new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, 
HELP_DESCRIPTION, samples);
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
index 68ba584d73..fd1314c258 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.protocol.AuditData;
 import org.apache.inlong.audit.store.config.JdbcConfig;
 import org.apache.inlong.audit.store.entities.JdbcDataPo;
 import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.utils.PulsarUtils;
+import org.apache.inlong.audit.utils.DataUtils;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
@@ -185,6 +187,12 @@ public class JdbcService implements InsertData, 
AutoCloseable {
 
     @Override
     public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId 
messageId) {
+        if (!isAuditDataValid(msgBody)) {
+            MetricsManager.getInstance().addInvalidData();
+            PulsarUtils.acknowledge(consumer, messageId);
+            LOG.error("Invalid audit data: {} ", msgBody);
+            return;
+        }
         JdbcDataPo data = new JdbcDataPo();
         data.setConsumer(consumer);
         data.setMessageId(messageId);
@@ -230,4 +238,29 @@ public class JdbcService implements InsertData, 
AutoCloseable {
             }
         }
     }
+
+    private boolean isAuditDataValid(AuditData auditData) {
+        // Check if any of the timestamp fields are within the valid range
+        if (!isDataTimeValid(auditData)) {
+            return false;
+        }
+        // Check if any of the audit items are valid
+        return isAuditItemValid(auditData);
+    }
+
+    private boolean isDataTimeValid(AuditData auditData) {
+        long validDataTimeRangeMs = jdbcConfig.getValidDataTimeRangeMs();
+        return DataUtils.isDataTimeValid(auditData.getLogTs(), 
validDataTimeRangeMs) &&
+                DataUtils.isDataTimeValid(auditData.getSdkTs(), 
validDataTimeRangeMs);
+    }
+
+    private boolean isAuditItemValid(AuditData auditData) {
+        return DataUtils.isAuditItemValid(auditData.getInlongGroupId()) &&
+                DataUtils.isAuditItemValid(auditData.getInlongStreamId()) &&
+                DataUtils.isAuditItemValid(auditData.getAuditId()) &&
+                DataUtils.isAuditItemValid(auditData.getAuditTag()) &&
+                DataUtils.isAuditItemValid(auditData.getIp()) &&
+                DataUtils.isAuditItemValid(auditData.getDockerId()) &&
+                DataUtils.isAuditItemValid(auditData.getThreadId());
+    }
 }
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
index c1a5fe92f2..e90ebfc25f 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.audit.store.config.MessageQueueConfig;
 import org.apache.inlong.audit.store.config.StoreConfig;
 import org.apache.inlong.audit.store.metric.MetricsManager;
 import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.utils.PulsarUtils;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
@@ -128,29 +129,14 @@ public class PulsarConsume extends BaseConsume {
                         .messageListener(new MessageListener<byte[]>() {
 
                             public void received(Consumer<byte[]> consumer, 
Message<byte[]> msg) {
+                                String body = null;
                                 try {
-                                    String body = new String(msg.getData(), 
StandardCharsets.UTF_8);
+                                    body = new String(msg.getData(), 
StandardCharsets.UTF_8);
                                     handleMessage(body, consumer, 
msg.getMessageId());
-                                } catch (Exception e) {
+                                } catch (Exception exception) {
                                     
MetricsManager.getInstance().addReceiveFailed(1);
-
-                                    LOG.error("Consumer has exception topic 
{}, subName {}, ex {}",
-                                            topic,
-                                            
mqConfig.getPulsarConsumerSubName(),
-                                            e);
-                                    if 
(mqConfig.isPulsarConsumerEnableRetry()) {
-                                        try {
-                                            consumer.reconsumeLater(msg, 10, 
TimeUnit.SECONDS);
-                                        } catch (PulsarClientException 
pulsarClientException) {
-                                            LOG.error("Consumer reconsumeLater 
has exception "
-                                                    + "topic {}, subName {}, 
ex {}",
-                                                    topic,
-                                                    
mqConfig.getPulsarConsumerSubName(),
-                                                    pulsarClientException);
-                                        }
-                                    } else {
-                                        consumer.negativeAcknowledge(msg);
-                                    }
+                                    PulsarUtils.acknowledge(consumer, 
msg.getMessageId());
+                                    LOG.error("Invalid audit data: {}", body, 
exception);
                                 }
                             }
                         })
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
similarity index 51%
copy from 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
copy to 
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
index 0e5dd9ad18..882ee425a0 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.store.metric;
+package org.apache.inlong.audit.store.utils;
 
-import lombok.Data;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
+public class PulsarUtils {
 
-@Data
-public class MetricItem {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarUtils.class);
 
-    public static final String K_DIMENSION_KEY = "dimensionName";
-    private AtomicLong receiveCountSuccess = new AtomicLong(0);
-    private AtomicLong receiveFailed = new AtomicLong(0);
-    private AtomicLong sendCountSuccess = new AtomicLong(0);
-    private AtomicLong sendCountFailed = new AtomicLong(0);
-    private AtomicLong sendDuration = new AtomicLong(0);
-    public void resetAllMetrics() {
-        receiveCountSuccess.set(0);
-        receiveFailed.set(0);
-        sendCountSuccess.set(0);
-        sendCountFailed.set(0);
-        sendDuration.set(0);
+    public static void acknowledge(Consumer<byte[]> consumer, MessageId 
messageId) {
+        if (consumer == null) {
+            return;
+        }
+        try {
+            consumer.acknowledge(messageId);
+        } catch (Exception exception) {
+            LOG.error("Acknowledge topic:{}, consumer name:{}, message id: {} 
has exception ",
+                    consumer.getTopic(), consumer.getConsumerName(), 
messageId, exception);
+        }
     }
 }

Reply via email to