This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bd1010743 [INLONG-9804][Agent] Add Pulsar source for Agent (#9805)
5bd1010743 is described below

commit 5bd1010743ab8efb66959039fa00e0761a48e28c
Author: haifxu <[email protected]>
AuthorDate: Wed Mar 13 21:32:26 2024 +0800

    [INLONG-9804][Agent] Add Pulsar source for Agent (#9805)
    
    Co-authored-by: Charles Zhang <[email protected]>
---
 .../inlong/agent/constant/TaskConstants.java       |  10 ++
 .../org/apache/inlong/agent/pojo/PulsarTask.java   |  46 ++++++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  29 ++++
 .../agent/plugin/instance/PulsarInstance.java      | 183 +++++++++++++++++++++
 .../inlong/agent/plugin/sources/KafkaSource.java   |   1 -
 .../{KafkaSource.java => PulsarSource.java}        | 180 +++++++++-----------
 .../inlong/agent/plugin/task/PulsarTask.java       | 177 ++++++++++++++++++++
 7 files changed, 519 insertions(+), 107 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index b1fea01366..138e5ee78f 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -117,6 +117,16 @@ public class TaskConstants extends CommonConstants {
     public static final String JOB_KAFKA_READ_TIMEOUT = 
"job.kafkaJob.read.timeout";
     public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = 
"task.kafkaJob.autoOffsetReset";
 
+    // Pulsar task
+    public static final String TASK_PULSAR_TENANT = "task.pulsarTask.tenant";
+    public static final String TASK_PULSAR_NAMESPACE = 
"task.pulsarTask.namespace";
+    public static final String TASK_PULSAR_TOPIC = "task.pulsarTask.topic";
+    public static final String TASK_PULSAR_SUBSCRIPTION = 
"task.pulsarTask.subscription";
+    public static final String TASK_PULSAR_SUBSCRIPTION_TYPE = 
"task.pulsarTask.subscriptionType";
+    public static final String TASK_PULSAR_SERVICE_URL = 
"task.pulsarTask.serviceUrl";
+    public static final String TASK_PULSAR_SUBSCRIPTION_POSITION = 
"task.pulsarTask.subscriptionPosition";
+    public static final String TASK_PULSAR_RESET_TIME = 
"task.pulsarTask.resetTime";
+
     public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
     public static final String JOB_MONGO_USER = "job.mongoJob.user";
     public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
new file mode 100644
index 0000000000..fdff14f9f2
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class PulsarTask {
+
+    private String tenant;
+    private String namespace;
+    private String topic;
+    private String subscription;
+    private String subscriptionType;
+    private String serviceUrl;
+    private String subscriptionPosition;
+    private String resetTime;
+
+    @Data
+    public static class PulsarTaskConfig {
+
+        private String pulsarTenant;
+        private String namespace;
+        private String topic;
+        private String subscription;
+        private String subscriptionType;
+        private String serviceUrl;
+        private String subscriptionPosition;
+        private String resetTime;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index d080ee734c..f3fcc927ed 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.Line;
+import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -39,6 +40,7 @@ public class TaskProfileDto {
 
     public static final String DEFAULT_FILE_TASK = 
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
     public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
+    public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -57,6 +59,8 @@ public class TaskProfileDto {
      * kafka source
      */
     public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+    // pulsar source
+    public static final String PULSAR_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PulsarSource";
     /**
      * PostgreSQL source
      */
@@ -209,6 +213,23 @@ public class TaskProfileDto {
         return kafkaJob;
     }
 
+    private static PulsarTask getPulsarTask(DataConfig dataConfig) {
+        PulsarTaskConfig pulsarTaskConfig = 
GSON.fromJson(dataConfig.getExtParams(),
+                PulsarTaskConfig.class);
+        PulsarTask pulsarTask = new PulsarTask();
+
+        pulsarTask.setTenant(pulsarTaskConfig.getPulsarTenant());
+        pulsarTask.setNamespace(pulsarTaskConfig.getNamespace());
+        pulsarTask.setTopic(pulsarTaskConfig.getTopic());
+        pulsarTask.setSubscription(pulsarTaskConfig.getSubscription());
+        pulsarTask.setSubscriptionType(pulsarTaskConfig.getSubscriptionType());
+        pulsarTask.setServiceUrl(pulsarTaskConfig.getServiceUrl());
+        
pulsarTask.setSubscriptionPosition(pulsarTaskConfig.getSubscriptionPosition());
+        pulsarTask.setResetTime(pulsarTaskConfig.getResetTime());
+
+        return pulsarTask;
+    }
+
     private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
         PostgreSQLJob.PostgreSQLJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
                 PostgreSQLJob.PostgreSQLJobConfig.class);
@@ -456,6 +477,13 @@ public class TaskProfileDto {
                 task.setSource(KAFKA_SOURCE);
                 profileDto.setTask(task);
                 break;
+            case PULSAR:
+                task.setTaskClass(DEFAULT_PULSAR_TASK);
+                PulsarTask pulsarTask = getPulsarTask(dataConfig);
+                task.setPulsarTask(pulsarTask);
+                task.setSource(PULSAR_SOURCE);
+                profileDto.setTask(task);
+                break;
             case POSTGRES:
                 PostgreSQLJob postgreSQLJob = getPostgresJob(dataConfig);
                 task.setPostgreSQLJob(postgreSQLJob);
@@ -528,6 +556,7 @@ public class TaskProfileDto {
         private FileTask fileTask;
         private BinlogJob binlogJob;
         private KafkaJob kafkaJob;
+        private PulsarTask pulsarTask;
         private PostgreSQLJob postgreSQLJob;
         private OracleJob oracleJob;
         private MongoJob mongoJob;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
new file mode 100644
index 0000000000..b960930bf7
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
+
+public class PulsarInstance extends Instance {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarInstance.class);
+    private Source source;
+    private Sink sink;
+    private InstanceProfile profile;
+    private InstanceManager instanceManager;
+    private volatile boolean running = false;
+    private volatile boolean inited = false;
+    private int checkFinishCount = 0;
+
+    public static final int CORE_THREAD_SLEEP_TIME = 1;
+    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
+    private int heartBreakCheckCount = 0;
+    private long heartBeatStartTime = AgentUtils.getCurrentTime();
+
+    @Override
+    public boolean init(Object srcManager, InstanceProfile srcProfile) {
+        try {
+            instanceManager = (InstanceManager) srcManager;
+            profile = srcProfile;
+            profile.set(TaskConstants.INODE_INFO, "");
+            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
+                    profile.getInstanceId(), profile.toJsonStr());
+            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
+            source.init(profile);
+            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
+            sink.init(profile);
+            inited = true;
+            return true;
+        } catch (Throwable e) {
+            handleSourceDeleted();
+            doChangeState(State.FATAL);
+            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            return false;
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (!inited) {
+            return;
+        }
+        doChangeState(State.SUCCEEDED);
+        while (running) {
+            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
+        }
+        this.source.destroy();
+        this.sink.destroy();
+    }
+
+    @Override
+    public InstanceProfile getProfile() {
+        return profile;
+    }
+
+    @Override
+    public String getTaskId() {
+        return profile.getTaskId();
+    }
+
+    @Override
+    public String getInstanceId() {
+        return profile.getInstanceId();
+    }
+
+    @Override
+    public void run() {
+        while (!isFinished()) {
+            if (!source.sourceExist()) {
+                handleSourceDeleted();
+                break;
+            }
+
+            Message msg = source.read();
+            if (msg == null) {
+                if (source.sourceFinish() && sink.sinkFinish()) {
+                    checkFinishCount++;
+                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+                        handleReadEnd();
+                        break;
+                    }
+                } else {
+                    checkFinishCount = 0;
+                }
+                heartbeatStatic();
+                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            } else {
+                boolean suc = false;
+                while (!isFinished() && !suc) {
+                    suc = sink.write(msg);
+                    if (!suc) {
+                        heartbeatStatic();
+                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+                    }
+                }
+                heartBreakCheckCount++;
+                if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
+                    heartbeatStatic();
+                }
+            }
+        }
+    }
+
+    private void handleSourceDeleted() {
+        profile.setState(InstanceStateEnum.DELETE);
+        profile.setModifyTime(AgentUtils.getCurrentTime());
+        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    private void handleReadEnd() {
+        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}",
+                    instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    private void heartbeatStatic() {
+        String inlongGroupId = profile.getInlongGroupId();
+        String inlongStreamId = profile.getInlongStreamId();
+        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+            heartBreakCheckCount = 0;
+            heartBeatStartTime = AgentUtils.getCurrentTime();
+        }
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 45fd00f75a..9acebc9ef7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -226,7 +226,6 @@ public class KafkaSource extends AbstractSource {
             emptyCount.set(0);
             long offset = 0L;
             for (ConsumerRecord<String, byte[]> record : records) {
-                LOGGER.info("record: {}", record.value());
                 SourceData sourceData = new SourceData(record.value(), 
record.offset());
                 boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
                 if (!suc4Queue) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
similarity index 64%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 45fd00f75a..ebd8495d9f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -35,22 +35,18 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
@@ -65,19 +61,16 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_OFFSET_DELIMITER;
 import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
 import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_RESET_TIME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SERVICE_URL;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION_POSITION;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION_TYPE;
 
-/**
- * kafka source, split kafka source job into multi readers
- */
-public class KafkaSource extends AbstractSource {
+public class PulsarSource extends AbstractSource {
 
     @Data
     @AllArgsConstructor
@@ -88,21 +81,25 @@ public class KafkaSource extends AbstractSource {
         private Long offset;
     }
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSource.class);
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             1L, TimeUnit.SECONDS,
             new SynchronousQueue<>(),
-            new AgentThreadFactory("kafka-source"));
-    private BlockingQueue<KafkaSource.SourceData> queue;
+            new AgentThreadFactory("pulsar-source"));
+    private BlockingQueue<SourceData> queue;
     public InstanceProfile profile;
     private int maxPackSize;
+    private String inlongStreamId;
     private String taskId;
     private String instanceId;
     private String topic;
-    private Properties props = new Properties();
-    private String allPartitionOffsets;
-    Map<Integer, Long> partitionOffsets = new HashMap<>();
+    private String serviceUrl;
+    private String subscription;
+    private String subscriptionType;
+    private String subscriptionPosition;
+    private PulsarClient pulsarClient;
+    private Long timestamp;
     private volatile boolean running = false;
     private volatile boolean runnable = true;
     private volatile AtomicLong emptyCount = new AtomicLong(0);
@@ -111,21 +108,18 @@ public class KafkaSource extends AbstractSource {
     private final Integer READ_WAIT_TIMEOUT_MS = 10;
     private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
     private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
-
-    private static final String KAFKA_DESERIALIZER_METHOD =
-            "org.apache.kafka.common.serialization.ByteArrayDeserializer";
-    private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
     private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+    private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
     private boolean isRealTime = false;
     private boolean isRestoreFromDB = false;
 
-    public KafkaSource() {
+    public PulsarSource() {
     }
 
     @Override
     public void init(InstanceProfile profile) {
         try {
-            LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
+            LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
             this.profile = profile;
             super.init(profile);
             String cycleUnit = profile.get(TASK_CYCLE_UNIT);
@@ -135,26 +129,18 @@ public class KafkaSource extends AbstractSource {
             }
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
             maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+            inlongStreamId = profile.getInlongStreamId();
             taskId = profile.getTaskId();
             instanceId = profile.getInstanceId();
             topic = profile.getInstanceId();
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
-            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
KAFKA_DESERIALIZER_METHOD);
-            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KAFKA_DESERIALIZER_METHOD);
-            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
profile.get(TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET));
-            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-            allPartitionOffsets = profile.get(TASK_KAFKA_OFFSET);
+            serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
+            subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, 
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
+            subscriptionPosition = 
profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
+                    SubscriptionInitialPosition.Latest.name());
+            subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared.name());
+            timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
+            pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
             isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
-            if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
-                // example:0#110_1#666_2#222
-                String[] offsets = 
allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
-                for (String offset : offsets) {
-                    
partitionOffsets.put(Integer.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]),
-                            
Long.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
-                }
-            }
 
             EXECUTOR_SERVICE.execute(run());
         } catch (Exception ex) {
@@ -165,56 +151,42 @@ public class KafkaSource extends AbstractSource {
 
     private Runnable run() {
         return () -> {
-            AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" + 
instanceId);
+            AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" + 
instanceId);
             running = true;
             try {
-                List<PartitionInfo> partitionInfoList;
-                try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props)) {
-                    partitionInfoList = consumer.partitionsFor(topic);
-                }
-
-                props.put(KAFKA_SESSION_TIMEOUT, 30000);
-
-                try (KafkaConsumer<String, byte[]> kafkaConsumer = new 
KafkaConsumer<>(props)) {
-                    if (null != partitionInfoList) {
-                        List<TopicPartition> topicPartitions = new 
ArrayList<>();
-                        for (PartitionInfo partitionInfo : partitionInfoList) {
-                            TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(),
-                                    partitionInfo.partition());
-                            topicPartitions.add(topicPartition);
-                        }
-                        kafkaConsumer.assign(topicPartitions);
-
-                        if (!isRestoreFromDB && 
StringUtils.isNotBlank(allPartitionOffsets)) {
-                            for (TopicPartition topicPartition : 
topicPartitions) {
-                                Long offset = 
partitionOffsets.get(topicPartition.partition());
-                                if (ObjectUtils.isNotEmpty(offset)) {
-                                    kafkaConsumer.seek(topicPartition, offset);
-                                }
-                            }
-                        } else {
-                            LOGGER.info("Skip to seek offset");
-                        }
+                try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer(Schema.BYTES)
+                        .topic(topic)
+                        .subscriptionName(subscription)
+                        
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
+                        
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+                        .subscribe()) {
+
+                    if (!isRestoreFromDB && timestamp != 0L) {
+                        consumer.seek(timestamp);
+                        LOGGER.info("Reset consume from {}", timestamp);
+                    } else {
+                        LOGGER.info("Skip to reset consume");
                     }
-                    doRun(kafkaConsumer);
+
+                    doRun(consumer);
                 }
             } catch (Throwable e) {
-                LOGGER.error("do run error maybe topic is configured 
incorrectly: ", e);
+                LOGGER.error("do run error maybe pulsar client is configured 
incorrectly: ", e);
             }
             running = false;
         };
     }
 
-    private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
+    private void doRun(Consumer<byte[]> consumer) throws PulsarClientException 
{
         long lastPrintTime = 0;
         while (isRunnable()) {
             boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
             if (!suc) {
                 break;
             }
-            ConsumerRecords<String, byte[]> records = 
kafkaConsumer.poll(Duration.ofMillis(1000));
+            org.apache.pulsar.client.api.Message<byte[]> message = 
consumer.receive(0, TimeUnit.MILLISECONDS);
             
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_TOTAL_LEN);
-            if (records.isEmpty()) {
+            if (ObjectUtils.isEmpty(message)) {
                 if (queue.isEmpty()) {
                     emptyCount.incrementAndGet();
                 } else {
@@ -225,25 +197,25 @@ public class KafkaSource extends AbstractSource {
             }
             emptyCount.set(0);
             long offset = 0L;
-            for (ConsumerRecord<String, byte[]> record : records) {
-                LOGGER.info("record: {}", record.value());
-                SourceData sourceData = new SourceData(record.value(), 
record.offset());
-                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
-                if (!suc4Queue) {
-                    break;
-                }
-                putIntoQueue(sourceData);
-                offset = record.offset();
+            SourceData sourceData = new SourceData(message.getValue(), 0L);
+            boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length);
+            if (!suc4Queue) {
+                break;
             }
-            kafkaConsumer.commitSync();
+            putIntoQueue(sourceData);
+            consumer.acknowledge(message);
 
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
                 lastPrintTime = AgentUtils.getCurrentTime();
-                LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
+                LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
             }
         }
     }
 
+    public boolean isRunnable() {
+        return runnable;
+    }
+
     private boolean waitForPermit(String permitName, int permitLen) {
         boolean suc = false;
         while (!suc) {
@@ -274,24 +246,13 @@ public class KafkaSource extends AbstractSource {
             if (!offerSuc) {
                 
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
             }
-            LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(), 
topic);
+            LOGGER.debug("Read {} from pulsar topic {}", sourceData.getData(), 
topic);
         } catch (InterruptedException e) {
             
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
sourceData.data.length);
             LOGGER.error("fetchData offer failed {}", e.getMessage());
         }
     }
 
-    public boolean isRunnable() {
-        return runnable;
-    }
-
-    /**
-     * Stop running threads.
-     */
-    public void stopRunning() {
-        runnable = false;
-    }
-
     @Override
     public List<Reader> split(TaskProfile conf) {
         return null;
@@ -299,7 +260,7 @@ public class KafkaSource extends AbstractSource {
 
     @Override
     public Message read() {
-        KafkaSource.SourceData sourceData = null;
+        PulsarSource.SourceData sourceData = null;
         try {
             sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
@@ -313,14 +274,14 @@ public class KafkaSource extends AbstractSource {
         return finalMsg;
     }
 
-    private Message createMessage(SourceData sourceData) {
+    private Message createMessage(PulsarSource.SourceData sourceData) {
         String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.offset.toString());
         header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
 
-        long auditTime = 0;
+        long auditTime;
         if (isRealTime) {
             auditTime = AgentUtils.getCurrentTime();
         } else {
@@ -351,4 +312,11 @@ public class KafkaSource extends AbstractSource {
     public boolean sourceExist() {
         return true;
     }
+
+    /**
+     * Stop running threads.
+     */
+    public void stopRunning() {
+        runnable = false;
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
new file mode 100644
index 0000000000..1f5ce56c59
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_NAMESPACE;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TENANT;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
+
+public class PulsarTask extends Task {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarTask.class);
+    public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+    public static final int CORE_THREAD_SLEEP_TIME = 5000;
+    public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+    private TaskProfile taskProfile;
+    private Db basicDb;
+    private TaskManager taskManager;
+    private InstanceManager instanceManager;
+    private long lastPrintTime = 0;
+    private boolean initOK = false;
+    private volatile boolean running = false;
+    private boolean isAdded = false;
+    private boolean isRestoreFromDB = false;
+
+    private String tenant;
+    private String namespace;
+    private String topic;
+    private String instanceId;
+
+    @Override
+    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
+        taskManager = (TaskManager) srcManager;
+        commonInit(taskProfile, basicDb);
+        initOK = true;
+    }
+
+    private void commonInit(TaskProfile taskProfile, Db basicDb) {
+        LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr());
+        this.taskProfile = taskProfile;
+        this.basicDb = basicDb;
+        this.tenant = taskProfile.get(TASK_PULSAR_TENANT);
+        this.namespace = taskProfile.get(TASK_PULSAR_NAMESPACE);
+        this.topic = taskProfile.get(TASK_PULSAR_TOPIC);
+        this.instanceId = tenant + "/" + namespace + "/" + topic;
+
+        this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+                basicDb, taskManager.getTaskDb());
+        try {
+            instanceManager.start();
+        } catch (Exception e) {
+            LOGGER.error("start instance manager error: ", e);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        doChangeState(State.SUCCEEDED);
+        if (instanceManager != null) {
+            instanceManager.stop();
+        }
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return taskProfile;
+    }
+
+    @Override
+    public String getTaskId() {
+        if (taskProfile == null) {
+            return "";
+        }
+        return taskProfile.getTaskId();
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("pulsar-task-core-" + getTaskId());
+        running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
+        while (!isFinished()) {
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+                LOGGER.info("pulsar task running! taskId {}", getTaskId());
+                lastPrintTime = AgentUtils.getCurrentTime();
+            }
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+            if (!initOK) {
+                continue;
+            }
+
+            // Add instance profile to instance manager
+            addInstanceProfile();
+
+            String inlongGroupId = taskProfile.getInlongGroupId();
+            String inlongStreamId = taskProfile.getInlongStreamId();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
+        }
+    }
+
+    private void addInstanceProfile() {
+        if (isAdded) {
+            return;
+        }
+        String dataTime = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
+        InstanceAction action = new InstanceAction(ActionType.ADD, 
instanceProfile);
+        while (!isFinished() && !instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
+            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+        }
+        this.isAdded = true;
+    }
+}


Reply via email to