justinwwhuang commented on code in PR #9984:
URL: https://github.com/apache/inlong/pull/9984#discussion_r1567012836
##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java:
##########
@@ -141,183 +76,101 @@ public void init(InstanceProfile profile) {
timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).build();
isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
-
- EXECUTOR_SERVICE.execute(run());
+ consumer = getConsumer();
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + topic, ex);
}
}
- private Runnable run() {
- return () -> {
- AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" +
instanceId);
- running = true;
- try {
- try (Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES)
- .topic(topic)
- .subscriptionName(subscription)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
-
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
- .subscribe()) {
-
- if (!isRestoreFromDB && timestamp != 0L) {
- consumer.seek(timestamp);
- LOGGER.info("Reset consume from {}", timestamp);
- } else {
- LOGGER.info("Skip to reset consume");
- }
-
- doRun(consumer);
- }
- } catch (Throwable e) {
- LOGGER.error("do run error maybe pulsar client is configured
incorrectly: ", e);
- }
- running = false;
- };
- }
-
- private void doRun(Consumer<byte[]> consumer) throws PulsarClientException
{
- long lastPrintTime = 0;
- while (isRunnable()) {
- boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- if (!suc) {
- break;
- }
- org.apache.pulsar.client.api.Message<byte[]> message =
consumer.receive(0, TimeUnit.MILLISECONDS);
- if (ObjectUtils.isEmpty(message)) {
- if (queue.isEmpty()) {
- emptyCount.incrementAndGet();
- } else {
- emptyCount.set(0);
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- AgentUtils.silenceSleepInSeconds(1);
- continue;
- }
- emptyCount.set(0);
- long offset = 0L;
- SourceData sourceData = new SourceData(message.getValue(), 0L);
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length);
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(sourceData);
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- consumer.acknowledge(message);
-
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
- }
- }
+ @Override
+ protected String getThreadName() {
+ return "pulsar-source-" + taskId + "-" + instanceId;
}
- public boolean isRunnable() {
- return runnable;
+ @Override
+ protected boolean doPrepareToRead() {
+ return true;
}
- private boolean waitForPermit(String permitName, int permitLen) {
- boolean suc = false;
- while (!suc) {
- suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
- if (!suc) {
- MemoryManager.getInstance().printDetail(permitName, "log file
source");
- if (!isRunnable()) {
- return false;
- }
- AgentUtils.silenceSleepInSeconds(1);
- }
+ @Override
+ protected List<SourceData> readFromSource() {
+ List<SourceData> dataList = new ArrayList<>();
+ org.apache.pulsar.client.api.Message<byte[]> message = null;
+ try {
+ message = consumer.receive(0, TimeUnit.MILLISECONDS);
+ offset = message.getSequenceId();
+ } catch (PulsarClientException e) {
+ LOGGER.error("read from pulsar error", e);
}
- return true;
+ if (!ObjectUtils.isEmpty(message)) {
+ dataList.add(new SourceData(message.getValue(), 0L));
+ }
+ try {
+ consumer.acknowledge(message);
Review Comment:
In the future, this goal will be achieved by saving the messageid
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]