This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3f52c5c821 [INLONG-11298][Agent] Fix bug for pulsar source with empty
data process and specified time consumption (#11299)
3f52c5c821 is described below
commit 3f52c5c821e7fc391cd4d13f12b60325c6f8d127
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Oct 9 23:38:19 2024 +0800
[INLONG-11298][Agent] Fix bug for pulsar source with empty data process and
specified time consumption (#11299)
* [INLONG-11298][Agent] Fix bug for pulsar source
* [INLONG-11298][Agent] Fix the issue of inconsistent logic between the
page and backend code
---
.../inlong/agent/constant/CommonConstants.java | 2 +-
.../agent/message/file/ProxyMessageCache.java | 19 ---------
.../agent/core/instance/InstanceManager.java | 2 +-
.../inlong/agent/plugin/sources/PulsarSource.java | 47 +++++++++++++---------
.../agent/plugin/sources/file/AbstractSource.java | 8 ++--
5 files changed, 34 insertions(+), 44 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 45320406ef..53a5bd976c 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -59,7 +59,7 @@ public class CommonConstants {
public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
public static final String PROXY_BATCH_FLUSH_INTERVAL =
"proxy.batch.flush.interval";
- public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100;
+ public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1;
public static final String PROXY_SENDER_MAX_TIMEOUT =
"proxy.sender.maxTimeout";
// max timeout in seconds.
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index c7b151a26c..7e2aa28034 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -32,14 +32,11 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
@@ -56,11 +53,8 @@ public class ProxyMessageCache {
private final int maxPackSize;
private final int maxQueueNumber;
private final String groupId;
- // ms
- private final int cacheTimeout;
// streamId -> list of proxyMessage
private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>>
messageQueueMap;
- private final AtomicLong cacheSize = new AtomicLong(0);
private long lastPrintTime = 0;
private long dataTime;
private boolean isRealTime = false;
@@ -76,7 +70,6 @@ public class ProxyMessageCache {
this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
this.maxQueueNumber =
instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
- this.cacheTimeout =
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS,
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
messageQueueMap = new ConcurrentHashMap<>();
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
@@ -109,7 +102,6 @@ public class ProxyMessageCache {
return false;
}
messageQueue.put(message);
- cacheSize.addAndGet(message.getBody().length);
return true;
} catch (Exception ex) {
LOGGER.error("exception caught", ex);
@@ -159,13 +151,11 @@ public class ProxyMessageCache {
if (peekMessageLength > maxPackSize) {
LOGGER.warn("message size is {}, greater than max pack size
{}, drop it!",
peekMessage.getBody().length, maxPackSize);
- cacheSize.addAndGet(-bodySize);
messageQueue.remove();
break;
}
resultBatchSize += bodySize;
// decrease queue size.
- cacheSize.addAndGet(-bodySize);
bodyList.add(message.getBody());
offsetList.add(message.getAckInfo());
}
@@ -183,13 +173,4 @@ public class ProxyMessageCache {
}
return null;
}
-
- public Map<String, String> getExtraMap() {
- return extraMap;
- }
-
- public long getCacheSize() {
- return cacheSize.get();
- }
-
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 48aedfd09d..06dd20a99e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -317,7 +317,7 @@ public class InstanceManager extends AbstractDaemon {
}
private void addInstance(InstanceProfile profile) {
- if (instanceMap.size() >= instanceLimit) {
+ if (instanceMap.size() > instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 56dec44d55..949cf5a4c7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -50,10 +50,10 @@ public class PulsarSource extends AbstractSource {
private String serviceUrl;
private String subscription;
private String subscriptionType;
- private String subscriptionPosition;
private PulsarClient pulsarClient;
private Long timestamp;
private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
+ private final static String SUBSCRIPTION_CUSTOM = "Custom";
private boolean isRestoreFromDB = false;
private Consumer<byte[]> consumer;
private long offset = 0L;
@@ -68,8 +68,6 @@ public class PulsarSource extends AbstractSource {
topic = profile.getInstanceId();
serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
subscription = profile.get(TASK_PULSAR_SUBSCRIPTION,
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
- subscriptionPosition =
profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
- SubscriptionInitialPosition.Latest.name());
subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE,
SubscriptionType.Shared.name());
timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).build();
@@ -97,35 +95,48 @@ public class PulsarSource extends AbstractSource {
org.apache.pulsar.client.api.Message<byte[]> message = null;
try {
message = consumer.receive(0, TimeUnit.MILLISECONDS);
- offset = message.getSequenceId();
} catch (PulsarClientException e) {
LOGGER.error("read from pulsar error", e);
}
if (!ObjectUtils.isEmpty(message)) {
+ offset = message.getSequenceId();
dataList.add(new SourceData(message.getValue(), new
String(message.getMessageId().toByteArray(),
StandardCharsets.UTF_8)));
+ try {
+ consumer.acknowledge(message);
+ } catch (PulsarClientException e) {
+ LOGGER.error("ack pulsar error", e);
+ }
}
- try {
- consumer.acknowledge(message);
- } catch (PulsarClientException e) {
- LOGGER.error("ack pulsar error", e);
- }
+
return dataList;
}
private Consumer<byte[]> getConsumer() {
Consumer<byte[]> consumer = null;
try {
- consumer = pulsarClient.newConsumer(Schema.BYTES)
- .topic(topic)
- .subscriptionName(subscription)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
-
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
- .subscribe();
- if (!isRestoreFromDB && timestamp != 0L) {
- consumer.seek(timestamp);
- LOGGER.info("Reset consume from {}", timestamp);
+ String position = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
SubscriptionInitialPosition.Latest.name());
+ if (position.equals(SUBSCRIPTION_CUSTOM)) {
+ consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+ .subscribe();
+ if (!isRestoreFromDB) {
+ if (timestamp == 0L) {
+ LOGGER.error("Reset consume but timestamp is 0L");
+ } else {
+ consumer.seek(timestamp);
+ LOGGER.info("Reset consume from {}", timestamp);
+ }
+ }
} else {
+ consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(position))
+
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+ .subscribe();
LOGGER.info("Skip to reset consume");
}
return consumer;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index f1fb8b5570..299c3829a0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -38,7 +38,6 @@ import org.apache.inlong.common.metric.MetricRegister;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,10 +54,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
@@ -356,9 +353,7 @@ public abstract class AbstractSource implements Source {
}
private Message createMessage(SourceData sourceData) {
- String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
- header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.getOffset());
header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
if (extendedHandler != null) {
@@ -424,6 +419,9 @@ public abstract class AbstractSource implements Source {
@Override
public boolean sourceFinish() {
+ if (isRealTime) {
+ return false;
+ }
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}
}