This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d31c887dfa [INLONG-11605][Audit] Added audit data legitimacy
verification (#11610)
d31c887dfa is described below
commit d31c887dfa9e6733e0dcc5f564e7c08d64588114
Author: doleyzi <[email protected]>
AuthorDate: Wed Dec 18 15:10:17 2024 +0800
[INLONG-11605][Audit] Added audit data legitimacy verification (#11610)
---
.../apache/inlong/audit/protocol/AuditData.java | 114 +--------------------
.../org/apache/inlong/audit/utils/DataUtils.java | 48 +++++++++
.../apache/inlong/audit/utils/DataUtilsTest.java | 101 ++++++++++++++++++
.../inlong/audit/source/ServerMessageHandler.java | 7 +-
.../inlong/audit/store/config/JdbcConfig.java | 2 +
.../inlong/audit/store/metric/MetricDimension.java | 3 +-
.../inlong/audit/store/metric/MetricItem.java | 2 +
.../inlong/audit/store/metric/MetricsManager.java | 3 +
.../metric/prometheus/StorePrometheusMetric.java | 3 +-
.../inlong/audit/store/service/JdbcService.java | 33 ++++++
.../audit/store/service/consume/PulsarConsume.java | 26 ++---
.../MetricItem.java => utils/PulsarUtils.java} | 34 +++---
12 files changed, 221 insertions(+), 155 deletions(-)
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
index 85bde1d2d5..1efa200693 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java
@@ -17,6 +17,9 @@
package org.apache.inlong.audit.protocol;
+import lombok.Data;
+
+@Data
public class AuditData {
private String ip;
@@ -33,115 +36,4 @@ public class AuditData {
private long size;
private long delay;
private long auditVersion;
-
- public long getAuditVersion() {
- return auditVersion;
- }
-
- public void setAuditVersion(long auditVersion) {
- this.auditVersion = auditVersion;
- }
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- public String getDockerId() {
- return dockerId;
- }
-
- public void setDockerId(String dockerId) {
- this.dockerId = dockerId;
- }
-
- public String getThreadId() {
- return threadId;
- }
-
- public void setThreadId(String threadId) {
- this.threadId = threadId;
- }
-
- public long getSdkTs() {
- return sdkTs;
- }
-
- public void setSdkTs(long sdkTs) {
- this.sdkTs = sdkTs;
- }
-
- public long getPacketId() {
- return packetId;
- }
-
- public void setPacketId(long packetId) {
- this.packetId = packetId;
- }
-
- public long getLogTs() {
- return logTs;
- }
-
- public void setLogTs(long logTs) {
- this.logTs = logTs;
- }
-
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
- }
-
- public String getInlongStreamId() {
- return inlongStreamId;
- }
-
- public void setInlongStreamId(String inlongStreamId) {
- this.inlongStreamId = inlongStreamId;
- }
-
- public String getAuditId() {
- return auditId;
- }
-
- public void setAuditId(String auditId) {
- this.auditId = auditId;
- }
-
- public String getAuditTag() {
- return auditTag;
- }
-
- public void setAuditTag(String auditTag) {
- this.auditTag = auditTag;
- }
- public long getCount() {
- return count;
- }
-
- public void setCount(long count) {
- this.count = count;
- }
-
- public long getSize() {
- return size;
- }
-
- public void setSize(long size) {
- this.size = size;
- }
-
- public long getDelay() {
- return delay;
- }
-
- public void setDelay(long delay) {
- this.delay = delay;
- }
}
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
new file mode 100644
index 0000000000..76f6ee9510
--- /dev/null
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class DataUtils {
+
+ private static final int MAX_AUDIT_ITEM_LENGTH = 256;
+
+ /**
+ * Checks if the timestamp is within the specified deviation range.
+ *
+ * @param dataTime The timestamp to check.
+ * @param deviation The allowed time deviation.
+ * @return true if the timestamp is within the deviation range, false
otherwise.
+ */
+ public static boolean isDataTimeValid(long dataTime, long deviation) {
+ long currentTime = System.currentTimeMillis();
+ long timeDiff = Math.abs(currentTime - dataTime);
+ return timeDiff <= deviation;
+ }
+
+ /**
+ * Checks if the audit item is valid.
+ *
+ * @param auditItem The audit item to check.
+ * @return true if the audit item is blank or its length is less than the
maximum length, false otherwise.
+ */
+ public static boolean isAuditItemValid(String auditItem) {
+ return StringUtils.isBlank(auditItem) || auditItem.length() <
MAX_AUDIT_ITEM_LENGTH;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
new file mode 100644
index 0000000000..6b31bac1a3
--- /dev/null
+++
b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DataUtilsTest {
+
+ @Test
+ public void testIsDataTimeValid() {
+ long deviation = 604800000;
+ long dataTime = System.currentTimeMillis();
+ boolean valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertTrue(valid);
+
+ dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 2;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertTrue(valid);
+
+ dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 2;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertTrue(valid);
+
+ dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 8;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertFalse(valid);
+
+ dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 8;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertFalse(valid);
+
+ dataTime = 1734356619540000L;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertFalse(valid);
+
+ dataTime = 1L;
+ valid = DataUtils.isDataTimeValid(dataTime, deviation);
+ assertFalse(valid);
+ }
+
+ @Test
+ public void testIsAuditItemValid() {
+ String auditItem = null;
+ boolean valid = DataUtils.isAuditItemValid(auditItem);
+ assertTrue(valid);
+
+ auditItem = "";
+ valid = DataUtils.isAuditItemValid(auditItem);
+ assertTrue(valid);
+
+ auditItem = "1@dff";
+ valid = DataUtils.isAuditItemValid(auditItem);
+ assertTrue(valid);
+
+ auditItem = "fb320c7e51";
+ valid = DataUtils.isAuditItemValid(auditItem);
+ assertTrue(valid);
+
+ auditItem = "127.0.0.1";
+ valid = DataUtils.isAuditItemValid(auditItem);
+ assertTrue(valid);
+
+ Random random = new Random();
+ StringBuilder stringBuilder128 = new StringBuilder(128);
+ for (int i = 0; i < 128; i++) {
+ char c = (char) (random.nextInt(26) + 'a');
+ stringBuilder128.append(c);
+ }
+ valid = DataUtils.isAuditItemValid(stringBuilder128.toString());
+ assertTrue(valid);
+
+ StringBuilder stringBuilder256 = new StringBuilder(256);
+ for (int i = 0; i < 256; i++) {
+ char c = (char) (random.nextInt(26) + 'a');
+ stringBuilder256.append(c);
+ }
+ valid = DataUtils.isAuditItemValid(stringBuilder256.toString());
+ assertFalse(valid);
+
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 7595586dd4..d792610002 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -161,12 +161,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
LOGGER.debug("Receive message count: {}",
auditRequest.getMsgBodyCount());
for (AuditMessageBody auditMessageBody : bodyList) {
long msgDays = messageDays(auditMessageBody.getLogTs());
- if (msgDays >= this.msgValidThresholdDays) {
- LOGGER.debug("Discard the data as it is from {} days ago, only
the data with a log timestamp"
- + " less than {} days is valid", msgDays,
this.msgValidThresholdDays);
-
+ if (Math.abs(msgDays) >= this.msgValidThresholdDays) {
+ LOGGER.debug("Discard the invalid audit data: {}",
auditMessageBody);
MetricsManager.getInstance().addReceiveCountExpired(1);
-
continue;
}
AuditData auditData = new AuditData();
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
index 0b6c7c36bb..7b059817d4 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java
@@ -41,4 +41,6 @@ public class JdbcConfig {
private int processIntervalMs;
@Value("${audit.store.data.queue.size:1000000}")
private int dataQueueSize;
+ @Value("${audit.store.valid.datatime.range.ms:604800000}")
+ private long validDataTimeRangeMs;
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
index 02c2258dbc..baa4bf21bb 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java
@@ -23,7 +23,8 @@ public enum MetricDimension {
RECEIVE_FAILED("receiveFailed"),
SEND_COUNT_SUCCESS("sendCountSuccess"),
SEND_COUNT_FAILED("sendCountFailed"),
- SEND_DURATION("sendDuration");
+ SEND_DURATION("sendDuration"),
+ INVALID_DATA("invalidData");
private final String key;
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
index 0e5dd9ad18..848f79b5db 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
@@ -30,11 +30,13 @@ public class MetricItem {
private AtomicLong sendCountSuccess = new AtomicLong(0);
private AtomicLong sendCountFailed = new AtomicLong(0);
private AtomicLong sendDuration = new AtomicLong(0);
+ private AtomicLong invalidData = new AtomicLong(0);
public void resetAllMetrics() {
receiveCountSuccess.set(0);
receiveFailed.set(0);
sendCountSuccess.set(0);
sendCountFailed.set(0);
sendDuration.set(0);
+ invalidData.set(0);
}
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
index 68b69609cf..305c05df35 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java
@@ -87,6 +87,9 @@ public class MetricsManager {
metricItem.getSendCountFailed().addAndGet(count);
metricItem.getSendDuration().addAndGet(duration);
}
+ public void addInvalidData() {
+ metricItem.getInvalidData().addAndGet(1);
+ }
public void shutdown() {
timer.shutdown();
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
index bb72b7a178..8f1ad2d2c1 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java
@@ -64,7 +64,8 @@ public class StorePrometheusMetric extends Collector
implements AbstractMetric {
createSample(MetricDimension.RECEIVE_FAILED,
metricItem.getReceiveFailed().doubleValue()),
createSample(MetricDimension.SEND_COUNT_SUCCESS,
metricItem.getSendCountSuccess().doubleValue()),
createSample(MetricDimension.SEND_COUNT_FAILED,
metricItem.getSendCountFailed().doubleValue()),
- createSample(MetricDimension.SEND_DURATION,
metricItem.getSendDuration().doubleValue()));
+ createSample(MetricDimension.SEND_DURATION,
metricItem.getSendDuration().doubleValue()),
+ createSample(MetricDimension.INVALID_DATA,
metricItem.getInvalidData().doubleValue()));
MetricFamilySamples metricFamilySamples =
new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE,
HELP_DESCRIPTION, samples);
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
index 68ba584d73..fd1314c258 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java
@@ -21,6 +21,8 @@ import org.apache.inlong.audit.protocol.AuditData;
import org.apache.inlong.audit.store.config.JdbcConfig;
import org.apache.inlong.audit.store.entities.JdbcDataPo;
import org.apache.inlong.audit.store.metric.MetricsManager;
+import org.apache.inlong.audit.store.utils.PulsarUtils;
+import org.apache.inlong.audit.utils.DataUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
@@ -185,6 +187,12 @@ public class JdbcService implements InsertData,
AutoCloseable {
@Override
public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId
messageId) {
+ if (!isAuditDataValid(msgBody)) {
+ MetricsManager.getInstance().addInvalidData();
+ PulsarUtils.acknowledge(consumer, messageId);
+ LOG.error("Invalid audit data: {} ", msgBody);
+ return;
+ }
JdbcDataPo data = new JdbcDataPo();
data.setConsumer(consumer);
data.setMessageId(messageId);
@@ -230,4 +238,29 @@ public class JdbcService implements InsertData,
AutoCloseable {
}
}
}
+
+ private boolean isAuditDataValid(AuditData auditData) {
+ // Check if any of the timestamp fields are within the valid range
+ if (!isDataTimeValid(auditData)) {
+ return false;
+ }
+ // Check if any of the audit items are valid
+ return isAuditItemValid(auditData);
+ }
+
+ private boolean isDataTimeValid(AuditData auditData) {
+ long validDataTimeRangeMs = jdbcConfig.getValidDataTimeRangeMs();
+ return DataUtils.isDataTimeValid(auditData.getLogTs(),
validDataTimeRangeMs) &&
+ DataUtils.isDataTimeValid(auditData.getSdkTs(),
validDataTimeRangeMs);
+ }
+
+ private boolean isAuditItemValid(AuditData auditData) {
+ return DataUtils.isAuditItemValid(auditData.getInlongGroupId()) &&
+ DataUtils.isAuditItemValid(auditData.getInlongStreamId()) &&
+ DataUtils.isAuditItemValid(auditData.getAuditId()) &&
+ DataUtils.isAuditItemValid(auditData.getAuditTag()) &&
+ DataUtils.isAuditItemValid(auditData.getIp()) &&
+ DataUtils.isAuditItemValid(auditData.getDockerId()) &&
+ DataUtils.isAuditItemValid(auditData.getThreadId());
+ }
}
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
index c1a5fe92f2..e90ebfc25f 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.audit.store.config.MessageQueueConfig;
import org.apache.inlong.audit.store.config.StoreConfig;
import org.apache.inlong.audit.store.metric.MetricsManager;
import org.apache.inlong.audit.store.service.InsertData;
+import org.apache.inlong.audit.store.utils.PulsarUtils;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
@@ -128,29 +129,14 @@ public class PulsarConsume extends BaseConsume {
.messageListener(new MessageListener<byte[]>() {
public void received(Consumer<byte[]> consumer,
Message<byte[]> msg) {
+ String body = null;
try {
- String body = new String(msg.getData(),
StandardCharsets.UTF_8);
+ body = new String(msg.getData(),
StandardCharsets.UTF_8);
handleMessage(body, consumer,
msg.getMessageId());
- } catch (Exception e) {
+ } catch (Exception exception) {
MetricsManager.getInstance().addReceiveFailed(1);
-
- LOG.error("Consumer has exception topic
{}, subName {}, ex {}",
- topic,
-
mqConfig.getPulsarConsumerSubName(),
- e);
- if
(mqConfig.isPulsarConsumerEnableRetry()) {
- try {
- consumer.reconsumeLater(msg, 10,
TimeUnit.SECONDS);
- } catch (PulsarClientException
pulsarClientException) {
- LOG.error("Consumer reconsumeLater
has exception "
- + "topic {}, subName {},
ex {}",
- topic,
-
mqConfig.getPulsarConsumerSubName(),
- pulsarClientException);
- }
- } else {
- consumer.negativeAcknowledge(msg);
- }
+ PulsarUtils.acknowledge(consumer,
msg.getMessageId());
+ LOG.error("Invalid audit data: {}", body,
exception);
}
}
})
diff --git
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
similarity index 51%
copy from
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
copy to
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
index 0e5dd9ad18..882ee425a0 100644
---
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java
+++
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.store.metric;
+package org.apache.inlong.audit.store.utils;
-import lombok.Data;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicLong;
+public class PulsarUtils {
-@Data
-public class MetricItem {
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarUtils.class);
- public static final String K_DIMENSION_KEY = "dimensionName";
- private AtomicLong receiveCountSuccess = new AtomicLong(0);
- private AtomicLong receiveFailed = new AtomicLong(0);
- private AtomicLong sendCountSuccess = new AtomicLong(0);
- private AtomicLong sendCountFailed = new AtomicLong(0);
- private AtomicLong sendDuration = new AtomicLong(0);
- public void resetAllMetrics() {
- receiveCountSuccess.set(0);
- receiveFailed.set(0);
- sendCountSuccess.set(0);
- sendCountFailed.set(0);
- sendDuration.set(0);
+ public static void acknowledge(Consumer<byte[]> consumer, MessageId
messageId) {
+ if (consumer == null) {
+ return;
+ }
+ try {
+ consumer.acknowledge(messageId);
+ } catch (Exception exception) {
+ LOG.error("Acknowledge topic:{}, consumer name:{}, message id: {}
has exception ",
+ consumer.getTopic(), consumer.getConsumerName(),
messageId, exception);
+ }
}
}