This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5bd1010743 [INLONG-9804][Agent] Add Pulsar source for Agent (#9805)
5bd1010743 is described below
commit 5bd1010743ab8efb66959039fa00e0761a48e28c
Author: haifxu <[email protected]>
AuthorDate: Wed Mar 13 21:32:26 2024 +0800
[INLONG-9804][Agent] Add Pulsar source for Agent (#9805)
Co-authored-by: Charles Zhang <[email protected]>
---
.../inlong/agent/constant/TaskConstants.java | 10 ++
.../org/apache/inlong/agent/pojo/PulsarTask.java | 46 ++++++
.../apache/inlong/agent/pojo/TaskProfileDto.java | 29 ++++
.../agent/plugin/instance/PulsarInstance.java | 183 +++++++++++++++++++++
.../inlong/agent/plugin/sources/KafkaSource.java | 1 -
.../{KafkaSource.java => PulsarSource.java} | 180 +++++++++-----------
.../inlong/agent/plugin/task/PulsarTask.java | 177 ++++++++++++++++++++
7 files changed, 519 insertions(+), 107 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index b1fea01366..138e5ee78f 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -117,6 +117,16 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_KAFKA_READ_TIMEOUT =
"job.kafkaJob.read.timeout";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET =
"task.kafkaJob.autoOffsetReset";
+ // Pulsar task
+ public static final String TASK_PULSAR_TENANT = "task.pulsarTask.tenant";
+ public static final String TASK_PULSAR_NAMESPACE =
"task.pulsarTask.namespace";
+ public static final String TASK_PULSAR_TOPIC = "task.pulsarTask.topic";
+ public static final String TASK_PULSAR_SUBSCRIPTION =
"task.pulsarTask.subscription";
+ public static final String TASK_PULSAR_SUBSCRIPTION_TYPE =
"task.pulsarTask.subscriptionType";
+ public static final String TASK_PULSAR_SERVICE_URL =
"task.pulsarTask.serviceUrl";
+ public static final String TASK_PULSAR_SUBSCRIPTION_POSITION =
"task.pulsarTask.subscriptionPosition";
+ public static final String TASK_PULSAR_RESET_TIME =
"task.pulsarTask.resetTime";
+
public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
public static final String JOB_MONGO_USER = "job.mongoJob.user";
public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
new file mode 100644
index 0000000000..fdff14f9f2
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PulsarTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class PulsarTask {
+
+ private String tenant;
+ private String namespace;
+ private String topic;
+ private String subscription;
+ private String subscriptionType;
+ private String serviceUrl;
+ private String subscriptionPosition;
+ private String resetTime;
+
+ @Data
+ public static class PulsarTaskConfig {
+
+ private String pulsarTenant;
+ private String namespace;
+ private String topic;
+ private String subscription;
+ private String subscriptionType;
+ private String serviceUrl;
+ private String subscriptionPosition;
+ private String resetTime;
+ }
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index d080ee734c..f3fcc927ed 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.Line;
+import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
@@ -39,6 +40,7 @@ public class TaskProfileDto {
public static final String DEFAULT_FILE_TASK =
"org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK =
"org.apache.inlong.agent.plugin.task.KafkaTask";
+ public static final String DEFAULT_PULSAR_TASK =
"org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_CHANNEL =
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK =
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -57,6 +59,8 @@ public class TaskProfileDto {
* kafka source
*/
public static final String KAFKA_SOURCE =
"org.apache.inlong.agent.plugin.sources.KafkaSource";
+ // pulsar source
+ public static final String PULSAR_SOURCE =
"org.apache.inlong.agent.plugin.sources.PulsarSource";
/**
* PostgreSQL source
*/
@@ -209,6 +213,23 @@ public class TaskProfileDto {
return kafkaJob;
}
+ private static PulsarTask getPulsarTask(DataConfig dataConfig) {
+ PulsarTaskConfig pulsarTaskConfig =
GSON.fromJson(dataConfig.getExtParams(),
+ PulsarTaskConfig.class);
+ PulsarTask pulsarTask = new PulsarTask();
+
+ pulsarTask.setTenant(pulsarTaskConfig.getPulsarTenant());
+ pulsarTask.setNamespace(pulsarTaskConfig.getNamespace());
+ pulsarTask.setTopic(pulsarTaskConfig.getTopic());
+ pulsarTask.setSubscription(pulsarTaskConfig.getSubscription());
+ pulsarTask.setSubscriptionType(pulsarTaskConfig.getSubscriptionType());
+ pulsarTask.setServiceUrl(pulsarTaskConfig.getServiceUrl());
+
pulsarTask.setSubscriptionPosition(pulsarTaskConfig.getSubscriptionPosition());
+ pulsarTask.setResetTime(pulsarTaskConfig.getResetTime());
+
+ return pulsarTask;
+ }
+
private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
PostgreSQLJob.PostgreSQLJobConfig config =
GSON.fromJson(dataConfigs.getExtParams(),
PostgreSQLJob.PostgreSQLJobConfig.class);
@@ -456,6 +477,13 @@ public class TaskProfileDto {
task.setSource(KAFKA_SOURCE);
profileDto.setTask(task);
break;
+ case PULSAR:
+ task.setTaskClass(DEFAULT_PULSAR_TASK);
+ PulsarTask pulsarTask = getPulsarTask(dataConfig);
+ task.setPulsarTask(pulsarTask);
+ task.setSource(PULSAR_SOURCE);
+ profileDto.setTask(task);
+ break;
case POSTGRES:
PostgreSQLJob postgreSQLJob = getPostgresJob(dataConfig);
task.setPostgreSQLJob(postgreSQLJob);
@@ -528,6 +556,7 @@ public class TaskProfileDto {
private FileTask fileTask;
private BinlogJob binlogJob;
private KafkaJob kafkaJob;
+ private PulsarTask pulsarTask;
private PostgreSQLJob postgreSQLJob;
private OracleJob oracleJob;
private MongoJob mongoJob;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
new file mode 100644
index 0000000000..b960930bf7
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
+
+public class PulsarInstance extends Instance {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarInstance.class);
+ private Source source;
+ private Sink sink;
+ private InstanceProfile profile;
+ private InstanceManager instanceManager;
+ private volatile boolean running = false;
+ private volatile boolean inited = false;
+ private int checkFinishCount = 0;
+
+ public static final int CORE_THREAD_SLEEP_TIME = 1;
+ private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+ private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
+ private final int WRITE_FAILED_WAIT_TIME_MS = 10;
+ private int heartBreakCheckCount = 0;
+ private long heartBeatStartTime = AgentUtils.getCurrentTime();
+
+ @Override
+ public boolean init(Object srcManager, InstanceProfile srcProfile) {
+ try {
+ instanceManager = (InstanceManager) srcManager;
+ profile = srcProfile;
+ profile.set(TaskConstants.INODE_INFO, "");
+ LOGGER.info("task id: {} submit new instance {} profile detail
{}.", profile.getTaskId(),
+ profile.getInstanceId(), profile.toJsonStr());
+ source = (Source)
Class.forName(profile.getSourceClass()).newInstance();
+ source.init(profile);
+ sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
+ sink.init(profile);
+ inited = true;
+ return true;
+ } catch (Throwable e) {
+ handleSourceDeleted();
+ doChangeState(State.FATAL);
+ LOGGER.error("init instance {} for task {} failed",
profile.getInstanceId(), profile.getInstanceId(), e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (!inited) {
+ return;
+ }
+ doChangeState(State.SUCCEEDED);
+ while (running) {
+ AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
+ }
+ this.source.destroy();
+ this.sink.destroy();
+ }
+
+ @Override
+ public InstanceProfile getProfile() {
+ return profile;
+ }
+
+ @Override
+ public String getTaskId() {
+ return profile.getTaskId();
+ }
+
+ @Override
+ public String getInstanceId() {
+ return profile.getInstanceId();
+ }
+
+ @Override
+ public void run() {
+ while (!isFinished()) {
+ if (!source.sourceExist()) {
+ handleSourceDeleted();
+ break;
+ }
+
+ Message msg = source.read();
+ if (msg == null) {
+ if (source.sourceFinish() && sink.sinkFinish()) {
+ checkFinishCount++;
+ if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+ handleReadEnd();
+ break;
+ }
+ } else {
+ checkFinishCount = 0;
+ }
+ heartbeatStatic();
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ } else {
+ boolean suc = false;
+ while (!isFinished() && !suc) {
+ suc = sink.write(msg);
+ if (!suc) {
+ heartbeatStatic();
+ AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
+ }
+ }
+ heartBreakCheckCount++;
+ if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
+ heartbeatStatic();
+ }
+ }
+ }
+ }
+
+ private void handleSourceDeleted() {
+ profile.setState(InstanceStateEnum.DELETE);
+ profile.setModifyTime(AgentUtils.getCurrentTime());
+ InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
+ AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ }
+ }
+
+ private void handleReadEnd() {
+ InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
+ instanceManager.getTaskId());
+ AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ }
+ }
+
+ private void heartbeatStatic() {
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
+ if (AgentUtils.getCurrentTime() - heartBeatStartTime >
TimeUnit.SECONDS.toMillis(1)) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ heartBreakCheckCount = 0;
+ heartBeatStartTime = AgentUtils.getCurrentTime();
+ }
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 45fd00f75a..9acebc9ef7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -226,7 +226,6 @@ public class KafkaSource extends AbstractSource {
emptyCount.set(0);
long offset = 0L;
for (ConsumerRecord<String, byte[]> record : records) {
- LOGGER.info("record: {}", record.value());
SourceData sourceData = new SourceData(record.value(),
record.offset());
boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
if (!suc4Queue) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
similarity index 64%
copy from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
copy to
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 45fd00f75a..ebd8495d9f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -35,22 +35,18 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -65,19 +61,16 @@ import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
-import static
org.apache.inlong.agent.constant.TaskConstants.JOB_OFFSET_DELIMITER;
import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET;
-import static
org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_BOOTSTRAP_SERVERS;
-import static org.apache.inlong.agent.constant.TaskConstants.TASK_KAFKA_OFFSET;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_RESET_TIME;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SERVICE_URL;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION_POSITION;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_SUBSCRIPTION_TYPE;
-/**
- * kafka source, split kafka source job into multi readers
- */
-public class KafkaSource extends AbstractSource {
+public class PulsarSource extends AbstractSource {
@Data
@AllArgsConstructor
@@ -88,21 +81,25 @@ public class KafkaSource extends AbstractSource {
private Long offset;
}
- private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSource.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarSource.class);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
- new AgentThreadFactory("kafka-source"));
- private BlockingQueue<KafkaSource.SourceData> queue;
+ new AgentThreadFactory("pulsar-source"));
+ private BlockingQueue<SourceData> queue;
public InstanceProfile profile;
private int maxPackSize;
+ private String inlongStreamId;
private String taskId;
private String instanceId;
private String topic;
- private Properties props = new Properties();
- private String allPartitionOffsets;
- Map<Integer, Long> partitionOffsets = new HashMap<>();
+ private String serviceUrl;
+ private String subscription;
+ private String subscriptionType;
+ private String subscriptionPosition;
+ private PulsarClient pulsarClient;
+ private Long timestamp;
private volatile boolean running = false;
private volatile boolean runnable = true;
private volatile AtomicLong emptyCount = new AtomicLong(0);
@@ -111,21 +108,18 @@ public class KafkaSource extends AbstractSource {
private final Integer READ_WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
private final Integer BATCH_TOTAL_LEN = 1024 * 1024;
-
- private static final String KAFKA_DESERIALIZER_METHOD =
- "org.apache.kafka.common.serialization.ByteArrayDeserializer";
- private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
+ private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
private boolean isRealTime = false;
private boolean isRestoreFromDB = false;
- public KafkaSource() {
+ public PulsarSource() {
}
@Override
public void init(InstanceProfile profile) {
try {
- LOGGER.info("KafkaSource init: {}", profile.toJsonStr());
+ LOGGER.info("PulsarSource init: {}", profile.toJsonStr());
this.profile = profile;
super.init(profile);
String cycleUnit = profile.get(TASK_CYCLE_UNIT);
@@ -135,26 +129,18 @@ public class KafkaSource extends AbstractSource {
}
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
maxPackSize = profile.getInt(PROXY_PACKAGE_MAX_SIZE,
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+ inlongStreamId = profile.getInlongStreamId();
taskId = profile.getTaskId();
instanceId = profile.getInstanceId();
topic = profile.getInstanceId();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
profile.get(TASK_KAFKA_BOOTSTRAP_SERVERS));
- props.put(ConsumerConfig.GROUP_ID_CONFIG, taskId);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KAFKA_DESERIALIZER_METHOD);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
profile.get(TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET));
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
- allPartitionOffsets = profile.get(TASK_KAFKA_OFFSET);
+ serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
+ subscription = profile.get(TASK_PULSAR_SUBSCRIPTION,
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
+ subscriptionPosition =
profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
+ SubscriptionInitialPosition.Latest.name());
+ subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE,
SubscriptionType.Shared.name());
+ timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
+ pulsarClient =
PulsarClient.builder().serviceUrl(serviceUrl).build();
isRestoreFromDB = profile.getBoolean(RESTORE_FROM_DB, false);
- if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
- // example:0#110_1#666_2#222
- String[] offsets =
allPartitionOffsets.split(JOB_OFFSET_DELIMITER);
- for (String offset : offsets) {
-
partitionOffsets.put(Integer.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[0]),
-
Long.valueOf(offset.split(JOB_KAFKA_PARTITION_OFFSET_DELIMITER)[1]));
- }
- }
EXECUTOR_SERVICE.execute(run());
} catch (Exception ex) {
@@ -165,56 +151,42 @@ public class KafkaSource extends AbstractSource {
private Runnable run() {
return () -> {
- AgentThreadFactory.nameThread("kafka-source-" + taskId + "-" +
instanceId);
+ AgentThreadFactory.nameThread("pulsar-source-" + taskId + "-" +
instanceId);
running = true;
try {
- List<PartitionInfo> partitionInfoList;
- try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props)) {
- partitionInfoList = consumer.partitionsFor(topic);
- }
-
- props.put(KAFKA_SESSION_TIMEOUT, 30000);
-
- try (KafkaConsumer<String, byte[]> kafkaConsumer = new
KafkaConsumer<>(props)) {
- if (null != partitionInfoList) {
- List<TopicPartition> topicPartitions = new
ArrayList<>();
- for (PartitionInfo partitionInfo : partitionInfoList) {
- TopicPartition topicPartition = new
TopicPartition(partitionInfo.topic(),
- partitionInfo.partition());
- topicPartitions.add(topicPartition);
- }
- kafkaConsumer.assign(topicPartitions);
-
- if (!isRestoreFromDB &&
StringUtils.isNotBlank(allPartitionOffsets)) {
- for (TopicPartition topicPartition :
topicPartitions) {
- Long offset =
partitionOffsets.get(topicPartition.partition());
- if (ObjectUtils.isNotEmpty(offset)) {
- kafkaConsumer.seek(topicPartition, offset);
- }
- }
- } else {
- LOGGER.info("Skip to seek offset");
- }
+ try (Consumer<byte[]> consumer =
pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
+
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+ .subscribe()) {
+
+ if (!isRestoreFromDB && timestamp != 0L) {
+ consumer.seek(timestamp);
+ LOGGER.info("Reset consume from {}", timestamp);
+ } else {
+ LOGGER.info("Skip to reset consume");
}
- doRun(kafkaConsumer);
+
+ doRun(consumer);
}
} catch (Throwable e) {
- LOGGER.error("do run error maybe topic is configured
incorrectly: ", e);
+ LOGGER.error("do run error maybe pulsar client is configured
incorrectly: ", e);
}
running = false;
};
}
- private void doRun(KafkaConsumer<String, byte[]> kafkaConsumer) {
+ private void doRun(Consumer<byte[]> consumer) throws PulsarClientException
{
long lastPrintTime = 0;
while (isRunnable()) {
boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
if (!suc) {
break;
}
- ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
+ org.apache.pulsar.client.api.Message<byte[]> message =
consumer.receive(0, TimeUnit.MILLISECONDS);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
- if (records.isEmpty()) {
+ if (ObjectUtils.isEmpty(message)) {
if (queue.isEmpty()) {
emptyCount.incrementAndGet();
} else {
@@ -225,25 +197,25 @@ public class KafkaSource extends AbstractSource {
}
emptyCount.set(0);
long offset = 0L;
- for (ConsumerRecord<String, byte[]> record : records) {
- LOGGER.info("record: {}", record.value());
- SourceData sourceData = new SourceData(record.value(),
record.offset());
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, record.value().length);
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(sourceData);
- offset = record.offset();
+ SourceData sourceData = new SourceData(message.getValue(), 0L);
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, message.getValue().length);
+ if (!suc4Queue) {
+ break;
}
- kafkaConsumer.commitSync();
+ putIntoQueue(sourceData);
+ consumer.acknowledge(message);
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("kafka topic is {}, offset is {}", topic, offset);
+ LOGGER.info("pulsar topic is {}, offset is {}", topic, offset);
}
}
}
+ public boolean isRunnable() {
+ return runnable;
+ }
+
private boolean waitForPermit(String permitName, int permitLen) {
boolean suc = false;
while (!suc) {
@@ -274,24 +246,13 @@ public class KafkaSource extends AbstractSource {
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
}
- LOGGER.debug("Read {} from kafka topic {}", sourceData.getData(),
topic);
+ LOGGER.debug("Read {} from pulsar topic {}", sourceData.getData(),
topic);
} catch (InterruptedException e) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length);
LOGGER.error("fetchData offer failed {}", e.getMessage());
}
}
- public boolean isRunnable() {
- return runnable;
- }
-
- /**
- * Stop running threads.
- */
- public void stopRunning() {
- runnable = false;
- }
-
@Override
public List<Reader> split(TaskProfile conf) {
return null;
@@ -299,7 +260,7 @@ public class KafkaSource extends AbstractSource {
@Override
public Message read() {
- KafkaSource.SourceData sourceData = null;
+ PulsarSource.SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -313,14 +274,14 @@ public class KafkaSource extends AbstractSource {
return finalMsg;
}
- private Message createMessage(SourceData sourceData) {
+ private Message createMessage(PulsarSource.SourceData sourceData) {
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
- long auditTime = 0;
+ long auditTime;
if (isRealTime) {
auditTime = AgentUtils.getCurrentTime();
} else {
@@ -351,4 +312,11 @@ public class KafkaSource extends AbstractSource {
public boolean sourceExist() {
return true;
}
+
+ /**
+ * Stop running threads.
+ */
+ public void stopRunning() {
+ runnable = false;
+ }
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
new file mode 100644
index 0000000000..1f5ce56c59
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_NAMESPACE;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TENANT;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_PULSAR_TOPIC;
+
+public class PulsarTask extends Task {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarTask.class);
+ public static final String DEFAULT_PULSAR_INSTANCE =
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+ public static final int CORE_THREAD_SLEEP_TIME = 5000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
+
+ private TaskProfile taskProfile;
+ private Db basicDb;
+ private TaskManager taskManager;
+ private InstanceManager instanceManager;
+ private long lastPrintTime = 0;
+ private boolean initOK = false;
+ private volatile boolean running = false;
+ private boolean isAdded = false;
+ private boolean isRestoreFromDB = false;
+
+ private String tenant;
+ private String namespace;
+ private String topic;
+ private String instanceId;
+
+ @Override
+ public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
+ taskManager = (TaskManager) srcManager;
+ commonInit(taskProfile, basicDb);
+ initOK = true;
+ }
+
+ private void commonInit(TaskProfile taskProfile, Db basicDb) {
+ LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr());
+ this.taskProfile = taskProfile;
+ this.basicDb = basicDb;
+ this.tenant = taskProfile.get(TASK_PULSAR_TENANT);
+ this.namespace = taskProfile.get(TASK_PULSAR_NAMESPACE);
+ this.topic = taskProfile.get(TASK_PULSAR_TOPIC);
+ this.instanceId = tenant + "/" + namespace + "/" + topic;
+
+ this.isRestoreFromDB = taskProfile.getBoolean(RESTORE_FROM_DB, false);
+ instanceManager = new InstanceManager(taskProfile.getTaskId(), 1,
+ basicDb, taskManager.getTaskDb());
+ try {
+ instanceManager.start();
+ } catch (Exception e) {
+ LOGGER.error("start instance manager error: ", e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ doChangeState(State.SUCCEEDED);
+ if (instanceManager != null) {
+ instanceManager.stop();
+ }
+ }
+
+ @Override
+ public TaskProfile getProfile() {
+ return taskProfile;
+ }
+
+ @Override
+ public String getTaskId() {
+ if (taskProfile == null) {
+ return "";
+ }
+ return taskProfile.getTaskId();
+ }
+
+ @Override
+ public boolean isProfileValid(TaskProfile profile) {
+ if (!profile.allRequiredKeyExist()) {
+ LOGGER.error("task profile needs all required key");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void addCallbacks() {
+
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("pulsar-task-core-" + getTaskId());
+ running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
+ while (!isFinished()) {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("pulsar task running! taskId {}", getTaskId());
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ if (!initOK) {
+ continue;
+ }
+
+ // Add instance profile to instance manager
+ addInstanceProfile();
+
+ String inlongGroupId = taskProfile.getInlongGroupId();
+ String inlongStreamId = taskProfile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
+ AgentUtils.getCurrentTime(), 1, 1);
+ }
+ }
+
+ private void addInstanceProfile() {
+ if (isAdded) {
+ return;
+ }
+ String dataTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+ InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_PULSAR_INSTANCE, instanceId,
+ CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+ LOGGER.info("taskProfile.createInstanceProfile: {}",
instanceProfile.toJsonStr());
+ InstanceAction action = new InstanceAction(ActionType.ADD,
instanceProfile);
+ while (!isFinished() && !instanceManager.submitAction(action)) {
+ LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ }
+ this.isAdded = true;
+ }
+}