This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 5cf0b4311 [INLONG-6156][DataProxy] Twice event write when topic is
empty (#6157)
5cf0b4311 is described below
commit 5cf0b4311bade4e203f8c2153d724376144de997
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Oct 12 18:15:31 2022 +0800
[INLONG-6156][DataProxy] Twice event write when topic is empty (#6157)
---
.../org/apache/inlong/dataproxy/sink/PulsarSink.java | 2 +-
.../apache/inlong/dataproxy/sink/pulsar/SinkTask.java | 17 ++++++++---------
2 files changed, 9 insertions(+), 10 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index c2014965d..b73223b11 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -498,7 +498,7 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
}
addStatistics(eventStat, false, 0);
eventStat.incRetryCnt();
- if (!eventStat.isOrderMessage() && needRetry) {
+ if (needRetry) {
processResendEvent(eventStat);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
index efbaedfb6..a0a488fa6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SinkTask.java
@@ -32,6 +32,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
import org.apache.inlong.dataproxy.sink.PulsarSink;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,6 +154,12 @@ public class SinkTask extends Thread {
logger.warn("Event is null!");
continue;
}
+ // check whether discard or send event
+ if (eventStat.getRetryCnt() > maxRetrySendCnt) {
+ logger.warn("Message will be discard! send times reach to
max retry cnt."
+ + " topic = {}, max retry cnt = {}", topic,
maxRetrySendCnt);
+ continue;
+ }
// get topic
topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
if (StringUtils.isEmpty(topic)) {
@@ -162,21 +169,13 @@ public class SinkTask extends Thread {
}
if (topic == null || topic.equals("")) {
pulsarSink.handleMessageSendException(topic, eventStat,
- new Exception(ConfigConstants.TOPIC_KEY + " info
is null"));
- processToReTrySend(eventStat);
- logger.warn("no topic specified, so will retry send!");
+ new NotFoundException(ConfigConstants.TOPIC_KEY +
" info is null"));
continue;
}
// check whether order-type message
if (eventStat.isOrderMessage()) {
sleep(1000);
}
- // check whether discard or send event
- if (eventStat.getRetryCnt() > maxRetrySendCnt) {
- logger.warn("Message will be discard! send times reach to
max retry cnt."
- + " topic = {}, max retry cnt = {}", topic,
maxRetrySendCnt);
- continue;
- }
// check whether duplicated event
String clientSeqId =
event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
if (pulsarConfig.getClientIdCache() && clientSeqId != null) {