This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8bd82cd2 [INLONG-6944][SDK] Remove useless code of SortSDK (#6951)
f8bd82cd2 is described below
commit f8bd82cd23849de2f80bf4bb9f23aa9aadb2afda
Author: vernedeng <[email protected]>
AuthorDate: Mon Dec 19 16:34:40 2022 +0800
[INLONG-6944][SDK] Remove useless code of SortSDK (#6951)
---
.../inlong/sdk/sort/api/InLongTopicFetcher.java | 82 ----
.../inlong/sdk/sort/api/InlongTopicManager.java | 49 --
.../inlong/sdk/sort/api/SortClientFactory.java | 25 --
.../sdk/sort/impl/InlongTopicManagerImpl.java | 492 ---------------------
.../inlong/sdk/sort/impl/ManagerReporter.java | 30 +-
.../inlong/sdk/sort/impl/SortClientImpl.java | 33 +-
.../inlong/sdk/sort/impl/SortClientImplV2.java | 159 -------
.../sort/impl/kafka/InLongKafkaFetcherImpl.java | 343 --------------
.../sort/impl/pulsar/InLongPulsarFetcherImpl.java | 332 --------------
.../sdk/sort/impl/tube/InLongTubeFetcherImpl.java | 320 --------------
.../sdk/sort/impl/InlongTopicManagerImplTest.java | 36 +-
.../sort/impl/decode/MessageDeserializerTest.java | 17 -
.../impl/kafka/InLongKafkaFetcherImplTest.java | 33 +-
.../impl/pulsar/InLongPulsarFetcherImplTest.java | 59 +--
.../sort/impl/tube/InLongTubeFetcherImplTest.java | 19 +-
.../standalone/source/sortsdk/SortSdkSource.java | 6 +-
16 files changed, 96 insertions(+), 1939 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
deleted file mode 100644
index f65645129..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.api;
-
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
-import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
-
-import java.util.Objects;
-import java.util.Optional;
-
-@Deprecated
-public abstract class InLongTopicFetcher {
-
- protected InLongTopic inLongTopic;
- protected ClientContext context;
- protected Deserializer deserializer;
- protected volatile Thread fetchThread;
- protected volatile boolean closed = false;
- protected volatile boolean isStopConsume = false;
- // use for empty topic to sleep
- protected long sleepTime = 0L;
- protected int emptyFetchTimes = 0;
- // for rollback
- protected Interceptor interceptor;
- protected Seeker seeker;
-
- public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
- this.inLongTopic = inLongTopic;
- this.context = context;
- this.deserializer = new MessageDeserializer();
- this.interceptor = new MsgTimeInterceptor();
- this.interceptor.configure(inLongTopic);
- }
-
- public abstract boolean init(Object client);
-
- public abstract void ack(String msgOffset) throws Exception;
-
- public abstract void pause();
-
- public abstract void resume();
-
- public abstract boolean close();
-
- public abstract boolean isClosed();
-
- public abstract void stopConsume(boolean stopConsume);
-
- public abstract boolean isConsumeStop();
-
- public abstract InLongTopic getInLongTopic();
-
- public abstract long getConsumedDataSize();
-
- public abstract long getAckedOffset();
-
- public boolean updateTopic(InLongTopic topic) {
- if (Objects.equals(inLongTopic, topic)) {
- return false;
- }
- this.inLongTopic = topic;
- Optional.ofNullable(seeker).ifPresent(seeker ->
seeker.configure(inLongTopic));
- Optional.ofNullable(interceptor).ifPresent(interceptor ->
interceptor.configure(inLongTopic));
- return true;
- }
-}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
deleted file mode 100644
index 4bcfee17a..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.api;
-
-import java.util.Collection;
-import java.util.Set;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-
-@Deprecated
-public abstract class InlongTopicManager implements Cleanable {
-
- protected ClientContext context;
- protected QueryConsumeConfig queryConsumeConfig;
-
- public InlongTopicManager(ClientContext context, QueryConsumeConfig
queryConsumeConfig) {
- this.context = context;
- this.queryConsumeConfig = queryConsumeConfig;
- }
-
- public abstract InLongTopicFetcher addFetcher(InLongTopic inLongTopic);
-
- public abstract InLongTopicFetcher removeFetcher(InLongTopic inLongTopic,
boolean closeFetcher);
-
- public abstract InLongTopicFetcher getFetcher(String fetchKey);
-
- public abstract Collection<InLongTopicFetcher> getAllFetchers();
-
- public abstract Set<String> getManagedInLongTopics();
-
- public abstract void offlineAllTopicsAndPartitions();
-
- public abstract void close();
-
-}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
index 8feba4d06..b5b0a4113 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
@@ -18,43 +18,18 @@
package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.impl.SortClientImpl;
-import org.apache.inlong.sdk.sort.impl.SortClientImplV2;
/**
* Factory of {@link SortClient}
*/
public class SortClientFactory {
- /**
- * create default SortClient
- *
- * @param config SortClientConfig
- * @return SortClient
- */
public static SortClient createSortClient(SortClientConfig config) {
return new SortClientImpl(config);
}
- /**
- * create SortClient with user defined QueryConsumeConfig ,MetricReporter
and ManagerReportHandler
- *
- * @param config SortClientConfig
- * @param queryConsumeConfig QueryConsumeConfig
- * @param reporter MetricReporter
- * @param reportHandler ManagerReportHandler
- * @return SortClient
- */
public static SortClient createSortClient(SortClientConfig config,
QueryConsumeConfig queryConsumeConfig,
MetricReporter reporter, ManagerReportHandler reportHandler) {
return new SortClientImpl(config, queryConsumeConfig, reporter,
reportHandler);
}
-
- public static SortClient createSortClientV2(SortClientConfig config) {
- return new SortClientImplV2(config);
- }
-
- public static SortClient createSortClientV2(SortClientConfig config,
QueryConsumeConfig queryConsumeConfig,
- MetricReporter reporter, ManagerReportHandler reportHandler) {
- return new SortClientImplV2(config, queryConsumeConfig, reporter,
reportHandler);
- }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
deleted file mode 100644
index 41e41a735..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
-import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl;
-import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
-import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl;
-import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
-import org.apache.inlong.sdk.sort.util.PeriodicTask;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class InlongTopicManagerImpl extends InlongTopicManager {
-
- private final Logger logger =
LoggerFactory.getLogger(InlongTopicManagerImpl.class);
-
- private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories
= new ConcurrentHashMap<>();
-
- private final PeriodicTask updateMetaDataWorker;
- private volatile List<String> toBeSelectFetchers = new ArrayList<>();
- private boolean stopAssign = false;
-
- public InlongTopicManagerImpl(ClientContext context, QueryConsumeConfig
queryConsumeConfig) {
- super(context, queryConsumeConfig);
- updateMetaDataWorker = new
UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
- TimeUnit.SECONDS);
- String threadName = "sortsdk_inlongtopic_manager_" +
context.getConfig().getSortTaskId()
- + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd
HH:mm:ss");
- updateMetaDataWorker.start(threadName);
- }
-
- private void updateToBeSelectFetchers(Collection<String> c) {
- toBeSelectFetchers = new ArrayList<>(c);
- }
-
- private boolean initFetcher(InLongTopicFetcher fetcher, InLongTopic
inLongTopic) {
- if
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("create fetcher topic is pulsar {}", inLongTopic);
- return
fetcher.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()));
- } else if
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("create fetcher topic is kafka {}", inLongTopic);
- return
fetcher.init(inLongTopic.getInLongCluster().getBootstraps());
- } else if
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("create fetcher topic is tube {}", inLongTopic);
- return
fetcher.init(tubeFactories.get(inLongTopic.getInLongCluster().getClusterId()));
- } else {
- logger.error("create fetcher topic type not support " +
inLongTopic.getTopicType());
- return false;
- }
- }
-
- @Override
- public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) {
-
- try {
- InLongTopicFetcher result =
fetchers.get(inLongTopic.getTopicKey());
- if (result == null) {
- // create fetcher (pulsar,tube,kafka)
- InLongTopicFetcher inLongTopicFetcher =
createInLongTopicFetcher(inLongTopic);
- InLongTopicFetcher preValue =
fetchers.putIfAbsent(inLongTopic.getTopicKey(), inLongTopicFetcher);
- logger.info("addFetcher :{}", inLongTopic.getTopicKey());
- if (preValue != null) {
- result = preValue;
- if (inLongTopicFetcher != null) {
- inLongTopicFetcher.close();
- }
- logger.info("addFetcher create same fetcher {}",
inLongTopic);
- } else {
- result = inLongTopicFetcher;
- if (result != null
- && !initFetcher(result, inLongTopic)) {
- logger.info("addFetcher init fail {}",
inLongTopic.getTopicKey());
- result.close();
- result = null;
- }
- }
- }
- return result;
- } finally {
- updateToBeSelectFetchers(fetchers.keySet());
- }
- }
-
- /**
- * create fetcher (pulsar,tube,kafka)
- *
- * @param inLongTopic {@link InLongTopic}
- * @return {@link InLongTopicFetcher}
- */
- private InLongTopicFetcher createInLongTopicFetcher(InLongTopic
inLongTopic) {
- if
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is pulsar {}", inLongTopic);
- return new InLongPulsarFetcherImpl(inLongTopic, context);
- } else if
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is kafka {}", inLongTopic);
- return new InLongKafkaFetcherImpl(inLongTopic, context);
- } else if
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is tube {}", inLongTopic);
- return new InLongTubeFetcherImpl(inLongTopic, context);
- } else {
- logger.error("topic type not support " +
inLongTopic.getTopicType());
- return null;
- }
- }
-
- @Override
- public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean
closeFetcher) {
- InLongTopicFetcher result = fetchers.remove(inLongTopic.getTopicKey());
- if (result != null && closeFetcher) {
- result.close();
- }
- return result;
- }
-
- @Override
- public InLongTopicFetcher getFetcher(String fetchKey) {
- return fetchers.get(fetchKey);
- }
-
- @Override
- public Set<String> getManagedInLongTopics() {
- return new HashSet<>(fetchers.keySet());
- }
-
- @Override
- public Collection<InLongTopicFetcher> getAllFetchers() {
- return fetchers.values();
- }
-
- /**
- * offline all inlong topic
- */
- @Override
- public void offlineAllTopicsAndPartitions() {
- String subscribeId = context.getConfig().getSortTaskId();
- try {
- logger.info("start offline {}", subscribeId);
- stopAssign = true;
- closeAllFetcher();
- logger.info("close finished {}", subscribeId);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void close() {
- if (updateMetaDataWorker != null) {
- updateMetaDataWorker.stop();
- }
- }
-
- @Override
- public boolean clean() {
- String sortTaskId = context.getConfig().getSortTaskId();
- try {
- logger.info("start close {}", sortTaskId);
-
- if (updateMetaDataWorker != null) {
- updateMetaDataWorker.stop();
- }
-
- closeFetcher();
- closePulsarClient();
- closeTubeSessionFactory();
- logger.info("close finished {}", sortTaskId);
- return true;
- } catch (Throwable th) {
- logger.error("close error " + sortTaskId, th);
- }
- return false;
- }
-
- private void closeAllFetcher() {
- closeFetcher();
- }
-
- private void closeFetcher() {
- Set<Entry<String, InLongTopicFetcher>> entries = fetchers.entrySet();
- for (Entry<String, InLongTopicFetcher> entry : entries) {
- String fetchKey = entry.getKey();
- InLongTopicFetcher inLongTopicFetcher = entry.getValue();
- boolean succ = false;
- if (inLongTopicFetcher != null) {
- try {
- succ = inLongTopicFetcher.close();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- logger.info(" close fetcher{} {}", fetchKey, succ);
- }
- }
-
- private void closePulsarClient() {
- for (Map.Entry<String, PulsarClient> entry : pulsarClients.entrySet())
{
- PulsarClient pulsarClient = entry.getValue();
- String key = entry.getKey();
- try {
- if (pulsarClient != null) {
- pulsarClient.close();
- }
- } catch (Exception e) {
- logger.error("close PulsarClient" + key + " error.", e);
- }
- }
- pulsarClients.clear();
- }
-
- private void closeTubeSessionFactory() {
- for (Map.Entry<String, TubeConsumerCreator> entry :
tubeFactories.entrySet()) {
- MessageSessionFactory tubeMessageSessionFactory =
entry.getValue().getMessageSessionFactory();
- String key = entry.getKey();
- try {
- if (tubeMessageSessionFactory != null) {
- tubeMessageSessionFactory.shutdown();
- }
- } catch (Exception e) {
- logger.error("close MessageSessionFactory" + key + " error.",
e);
- }
- }
- tubeFactories.clear();
- }
-
- private List<String> getNewTopics(List<InLongTopic>
newSubscribedInLongTopics) {
- if (newSubscribedInLongTopics != null &&
newSubscribedInLongTopics.size() > 0) {
- List<String> newTopics = new ArrayList<>();
- for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
- newTopics.add(inLongTopic.getTopicKey());
- }
- return newTopics;
- }
- return null;
- }
-
- private void handleCurrentConsumeConfig(List<InLongTopic>
currentConsumeConfig) {
- if (null == currentConsumeConfig) {
- logger.warn("List<InLongTopic> currentConsumeConfig is null");
- return;
- }
-
- List<InLongTopic> newConsumeConfig = new
ArrayList<>(currentConsumeConfig);
- logger.debug("newConsumeConfig List:{}",
Arrays.toString(newConsumeConfig.toArray()));
- List<String> newTopics = getNewTopics(newConsumeConfig);
- logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray()));
-
- List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet());
- logger.debug("oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
- // get need be offlined topics
- oldInLongTopics.removeAll(newTopics);
- logger.debug("removed oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
-
- // get new topics
- newTopics.removeAll(new ArrayList<>(fetchers.keySet()));
- logger.debug("really new topics :{}",
Arrays.toString(newTopics.toArray()));
- // offline need be offlined topics
- offlineRmovedTopic(oldInLongTopics);
- // online new topics
- onlineNewTopic(newConsumeConfig, newTopics);
- }
-
- /**
- * offline inlong topic which not belong the sortTaskId
- *
- * @param oldInLongTopics {@link List}
- */
- private void offlineRmovedTopic(List<String> oldInLongTopics) {
- for (String fetchKey : oldInLongTopics) {
- logger.info("offlineRmovedTopic {}", fetchKey);
- InLongTopic inLongTopic = fetchers.get(fetchKey).getInLongTopic();
- InLongTopicFetcher inLongTopicFetcher =
fetchers.getOrDefault(fetchKey, null);
- if (inLongTopicFetcher != null) {
- inLongTopicFetcher.close();
- }
- fetchers.remove(fetchKey);
- if (context != null && context.getStatManager() != null &&
inLongTopic != null) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
- inLongTopic.getTopic())
- .addTopicOfflineTimes(1);
- } else {
- logger.error("context == null or context.getStatManager() ==
null or inLongTopic == null :{}",
- inLongTopic);
- }
- }
- }
-
- /**
- * online new inlong topic
- *
- * @param newSubscribedInLongTopics List
- * @param reallyNewTopic List
- */
- private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics,
List<String> reallyNewTopic) {
- for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
- if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
-
logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
- continue;
- }
- onlineTopic(inLongTopic);
- }
- }
-
- private void onlineTopic(InLongTopic inLongTopic) {
- if
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is pulsar:{}", inLongTopic);
- onlinePulsarTopic(inLongTopic);
- } else if
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is kafka:{}", inLongTopic);
- onlineKafkaTopic(inLongTopic);
- } else if
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
{
- logger.info("the topic is tube:{}", inLongTopic);
- onlineTubeTopic(inLongTopic);
- } else {
- logger.error("topic type:{} not support",
inLongTopic.getTopicType());
- }
- }
-
- private void onlinePulsarTopic(InLongTopic inLongTopic) {
- if (!checkAndCreateNewPulsarClient(inLongTopic)) {
- logger.error("checkAndCreateNewPulsarClient error:{}",
inLongTopic);
- return;
- }
- createNewFetcher(inLongTopic);
- }
-
- private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
- if
(!pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
- if (inLongTopic.getInLongCluster().getBootstraps() != null) {
- try {
- PulsarClient pulsarClient = PulsarClient.builder()
-
.serviceUrl(inLongTopic.getInLongCluster().getBootstraps())
-
.authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken()))
-
.statsInterval(context.getConfig().getStatsIntervalSeconds(), TimeUnit.SECONDS)
- .build();
-
pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient);
- logger.debug("create pulsar client succ {}",
- new
String[]{inLongTopic.getInLongCluster().getClusterId(),
-
inLongTopic.getInLongCluster().getBootstraps(),
-
inLongTopic.getInLongCluster().getToken()});
- } catch (Exception e) {
- logger.error("create pulsar client error {}", inLongTopic);
- logger.error(e.getMessage(), e);
- return false;
- }
- } else {
- logger.error("bootstrap is null {}",
inLongTopic.getInLongCluster());
- return false;
- }
- }
- logger.info("create pulsar client true {}", inLongTopic);
- return true;
- }
-
- private boolean checkAndCreateNewTubeSessionFactory(InLongTopic
inLongTopic) {
- if
(!tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
- if (inLongTopic.getInLongCluster().getBootstraps() != null) {
- try {
- // create MessageSessionFactory
- TubeClientConfig tubeConfig = new
TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
- MessageSessionFactory messageSessionFactory = new
TubeSingleSessionFactory(tubeConfig);
- TubeConsumerCreator tubeConsumerCreator = new
TubeConsumerCreator(messageSessionFactory,
- tubeConfig);
-
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(),
tubeConsumerCreator);
- logger.debug("create tube client succ {} {} {}",
- new
String[]{inLongTopic.getInLongCluster().getClusterId(),
-
inLongTopic.getInLongCluster().getBootstraps(),
-
inLongTopic.getInLongCluster().getToken()});
- } catch (Exception e) {
- logger.error("create tube client error {}", inLongTopic);
- logger.error(e.getMessage(), e);
- return false;
- }
- } else {
- logger.info("bootstrap is null {}",
inLongTopic.getInLongCluster());
- return false;
- }
- }
- logger.info("create pulsar client true {}", inLongTopic);
- return true;
- }
-
- private void onlineKafkaTopic(InLongTopic inLongTopic) {
- createNewFetcher(inLongTopic);
- }
-
- private void onlineTubeTopic(InLongTopic inLongTopic) {
- if (!checkAndCreateNewTubeSessionFactory(inLongTopic)) {
- logger.error("checkAndCreateNewPulsarClient error:{}",
inLongTopic);
- return;
- }
- createNewFetcher(inLongTopic);
- }
-
- private void createNewFetcher(InLongTopic inLongTopic) {
- if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
- logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
- if (context != null && context.getStatManager() != null) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addTopicOnlineTimes(1);
- InLongTopicFetcher fetcher = addFetcher(inLongTopic);
- if (fetcher == null) {
- fetchers.remove(inLongTopic.getTopicKey());
- logger.error("add fetcher error:{}",
inLongTopic.getTopicKey());
- }
- } else {
- logger.error("context == null or context.getStatManager() ==
null");
- }
- }
- }
-
- private class UpdateMetaDataThread extends PeriodicTask {
-
- public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) {
- super(runInterval, timeUnit, context.getConfig());
- }
-
- @Override
- protected void doWork() {
- logger.debug("InLongTopicManagerImpl doWork");
- if (stopAssign) {
- logger.warn("assign is stoped");
- return;
- }
- // get sortTask conf from manager
- if (queryConsumeConfig != null) {
- long start = System.currentTimeMillis();
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
- .addRequestManagerTimes(1);
- ConsumeConfig consumeConfig = queryConsumeConfig
-
.queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
- .addRequestManagerTimeCost(System.currentTimeMillis()
- start);
-
- if (consumeConfig != null) {
- handleCurrentConsumeConfig(consumeConfig.getTopics());
- } else {
- logger.warn("subscribedInfo is null");
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
- .addRequestManagerFailTimes(1);
- }
- } else {
- logger.error("subscribedMetaDataInfo is null");
- }
- }
- }
-}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
index 8c7a3b2a8..f2fdd5668 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
@@ -25,22 +25,23 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
import org.apache.inlong.sdk.sort.api.ReportApi;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
import org.apache.inlong.sdk.sort.entity.ConsumeState;
import org.apache.inlong.sdk.sort.entity.ConsumeStatusParams;
import org.apache.inlong.sdk.sort.entity.ConsumeStatusResult;
import org.apache.inlong.sdk.sort.entity.HeartBeatParams;
import org.apache.inlong.sdk.sort.entity.HeartBeatResult;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.util.PeriodicTask;
public class ManagerReporter extends PeriodicTask {
private final ConcurrentHashMap<Integer, Long> reportApiRunTimeMs = new
ConcurrentHashMap<>();
private final ClientContext context;
- private final InlongTopicManager inLongTopicManager;
+ private final TopicManager inLongTopicManager;
private final ManagerReportHandler reportHandler;
private Map<Integer, Long> reportApiInterval = new HashMap<>();
@@ -53,8 +54,7 @@ public class ManagerReporter extends PeriodicTask {
* @param runInterval long
* @param timeUnit TimeUnit
*/
- public ManagerReporter(ClientContext context, ManagerReportHandler
reportHandler,
- InlongTopicManager inLongTopicManager,
+ public ManagerReporter(ClientContext context, ManagerReportHandler
reportHandler, TopicManager inLongTopicManager,
long runInterval, TimeUnit timeUnit) {
super(runInterval, timeUnit, context.getConfig());
this.context = context;
@@ -158,17 +158,17 @@ public class ManagerReporter extends PeriodicTask {
consumeStatusParams.setSubscribedId(context.getConfig().getSortTaskId());
consumeStatusParams.setIp(context.getConfig().getLocalIp());
List<ConsumeState> consumeStates = new ArrayList<>();
- Collection<InLongTopicFetcher> allFetchers =
+ Collection<TopicFetcher> allFetchers =
inLongTopicManager.getAllFetchers();
- for (InLongTopicFetcher fetcher : allFetchers) {
- ConsumeState consumeState = new ConsumeState();
- consumeState.setTopic(fetcher.getInLongTopic().getTopic());
-
consumeState.setTopicType(fetcher.getInLongTopic().getTopicType());
-
consumeState.setClusterId(fetcher.getInLongTopic().getInLongCluster().getClusterId());
-
consumeState.setConsumedDataSize(fetcher.getConsumedDataSize());
- consumeState.setAckOffset(fetcher.getAckedOffset());
-
consumeState.setPartition(fetcher.getInLongTopic().getPartitionId());
- consumeStates.add(consumeState);
+ for (TopicFetcher fetcher : allFetchers) {
+ for (InLongTopic topic : fetcher.getTopics()) {
+ ConsumeState consumeState = new ConsumeState();
+ consumeState.setTopic(topic.getTopic());
+ consumeState.setTopicType(topic.getTopicType());
+
consumeState.setClusterId(topic.getInLongCluster().getClusterId());
+ consumeState.setPartition(topic.getPartitionId());
+ consumeStates.add(consumeState);
+ }
}
consumeStatusParams.setConsumeStates(consumeStates);
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
index 753c39f60..a381b7910 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
@@ -19,18 +19,21 @@ package org.apache.inlong.sdk.sort.impl;
import org.apache.inlong.sdk.sort.api.Cleanable;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
import org.apache.inlong.sdk.sort.api.MetricReporter;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
import org.apache.inlong.sdk.sort.exception.NotExistException;
+import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Deprecated
+/**
+ * New version of sort client.
+ */
public class SortClientImpl extends SortClient {
private final String logPrefix = "[" +
SortClientImpl.class.getSimpleName() + "] ";
@@ -40,7 +43,7 @@ public class SortClientImpl extends SortClient {
private final ClientContext context;
- private final InlongTopicManager inLongTopicManager;
+ private final TopicManager inLongTopicManager;
/**
* SortClient Constructor
@@ -51,7 +54,10 @@ public class SortClientImpl extends SortClient {
try {
this.sortClientConfig = sortClientConfig;
this.context = new ClientContextImpl(this.sortClientConfig, new
MetricReporterImpl(sortClientConfig));
- this.inLongTopicManager = new InlongTopicManagerImpl(context, new
QueryConsumeConfigImpl(context));
+
+ this.inLongTopicManager = InlongTopicManagerFactory
+ .createInLongTopicManager(sortClientConfig.getTopicType(),
+ context, new QueryConsumeConfigImpl(context));
} catch (Exception e) {
this.close();
throw e;
@@ -71,7 +77,10 @@ public class SortClientImpl extends SortClient {
try {
this.sortClientConfig = sortClientConfig;
this.context = new ClientContextImpl(this.sortClientConfig,
metricReporter);
- this.inLongTopicManager = new InlongTopicManagerImpl(context, new
QueryConsumeConfigImpl(context));
+ queryConsumeConfig.configure(context);
+ this.inLongTopicManager = InlongTopicManagerFactory
+ .createInLongTopicManager(sortClientConfig.getTopicType(),
+ context, queryConsumeConfig);
} catch (Exception e) {
e.printStackTrace();
this.close();
@@ -102,8 +111,8 @@ public class SortClientImpl extends SortClient {
public void ack(String msgKey, String msgOffset)
throws Exception {
logger.debug("ack:{} offset:{}", msgKey, msgOffset);
- InLongTopicFetcher inLongTopicFetcher = getFetcher(msgKey);
- inLongTopicFetcher.ack(msgOffset);
+ TopicFetcher topicFetcher = getFetcher(msgKey);
+ topicFetcher.ack(msgOffset);
}
/**
@@ -128,12 +137,12 @@ public class SortClientImpl extends SortClient {
return this.sortClientConfig;
}
- private InLongTopicFetcher getFetcher(String msgKey) throws
NotExistException {
- InLongTopicFetcher inLongTopicFetcher =
inLongTopicManager.getFetcher(msgKey);
- if (inLongTopicFetcher == null) {
+ private TopicFetcher getFetcher(String msgKey) throws NotExistException {
+ TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
+ if (topicFetcher == null) {
throw new NotExistException(msgKey + " not exist.");
}
- return inLongTopicFetcher;
+ return topicFetcher;
}
private boolean doClose(Cleanable c) {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
deleted file mode 100644
index 3a730d082..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl;
-
-import org.apache.inlong.sdk.sort.api.Cleanable;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
-import org.apache.inlong.sdk.sort.api.MetricReporter;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.api.SortClient;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
-import org.apache.inlong.sdk.sort.api.TopicFetcher;
-import org.apache.inlong.sdk.sort.api.TopicManager;
-import org.apache.inlong.sdk.sort.exception.NotExistException;
-import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * New version of sort client.
- */
-public class SortClientImplV2 extends SortClient {
-
- private final String logPrefix = "[" +
SortClientImpl.class.getSimpleName() + "] ";
- private final Logger logger =
LoggerFactory.getLogger(SortClientImpl.class);
-
- private final SortClientConfig sortClientConfig;
-
- private final ClientContext context;
-
- private final TopicManager inLongTopicManager;
-
- /**
- * SortClient Constructor
- *
- * @param sortClientConfig SortClientConfig
- */
- public SortClientImplV2(SortClientConfig sortClientConfig) {
- try {
- this.sortClientConfig = sortClientConfig;
- this.context = new ClientContextImpl(this.sortClientConfig, new
MetricReporterImpl(sortClientConfig));
-
- this.inLongTopicManager = InlongTopicManagerFactory
- .createInLongTopicManager(sortClientConfig.getTopicType(),
- context, new QueryConsumeConfigImpl(context));
- } catch (Exception e) {
- this.close();
- throw e;
- }
- }
-
- /**
- * SortClient Constructor with user defined
QueryConsumeConfig,MetricReporter and ManagerReportHandler
- *
- * @param sortClientConfig SortClientConfig
- * @param queryConsumeConfig QueryConsumeConfig
- * @param metricReporter MetricReporter
- * @param managerReportHandler ManagerReportHandler
- */
- public SortClientImplV2(SortClientConfig sortClientConfig,
QueryConsumeConfig queryConsumeConfig,
- MetricReporter metricReporter, ManagerReportHandler
managerReportHandler) {
- try {
- this.sortClientConfig = sortClientConfig;
- this.context = new ClientContextImpl(this.sortClientConfig,
metricReporter);
- queryConsumeConfig.configure(context);
- this.inLongTopicManager = InlongTopicManagerFactory
- .createInLongTopicManager(sortClientConfig.getTopicType(),
- context, queryConsumeConfig);
- } catch (Exception e) {
- e.printStackTrace();
- this.close();
- throw e;
- }
- }
-
- /**
- * init SortClient
- *
- * @return true/false
- * @throws Throwable
- */
- @Override
- public boolean init() throws Throwable {
- logger.info(logPrefix + "init|" + sortClientConfig);
- return true;
- }
-
- /**
- * ack offset to msgKey
- *
- * @param msgKey String
- * @param msgOffset String
- * @throws Exception
- */
- @Override
- public void ack(String msgKey, String msgOffset)
- throws Exception {
- logger.debug("ack:{} offset:{}", msgKey, msgOffset);
- TopicFetcher topicFetcher = getFetcher(msgKey);
- topicFetcher.ack(msgOffset);
- }
-
- /**
- * close SortClient
- *
- * @return true/false
- */
- @Override
- public boolean close() {
- boolean cleanInLongTopicManager = doClose(inLongTopicManager);
- boolean cleanContext = doClose(context);
-
- logger.info(logPrefix
-
- + "|cleanInLongTopicManager=" + cleanInLongTopicManager
- + "|cleanContext=" + cleanContext);
- return (cleanInLongTopicManager && cleanContext);
- }
-
- @Override
- public SortClientConfig getConfig() {
- return this.sortClientConfig;
- }
-
- private TopicFetcher getFetcher(String msgKey) throws NotExistException {
- TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
- if (topicFetcher == null) {
- throw new NotExistException(msgKey + " not exist.");
- }
- return topicFetcher;
- }
-
- private boolean doClose(Cleanable c) {
- try {
- if (c != null) {
- return c.clean();
- }
- return true;
- } catch (Throwable th) {
- logger.error(logPrefix + "clean error.", th);
- return false;
- }
- }
-}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
deleted file mode 100644
index deffa3272..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.kafka;
-
-import com.google.gson.Gson;
-
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.SeekerFactory;
-import org.apache.inlong.sdk.sort.api.SortClientConfig.ConsumeStrategy;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.fetcher.kafka.AckOffsetOnRebalance;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-@Deprecated
-public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
-
- private final Logger logger =
LoggerFactory.getLogger(InLongKafkaFetcherImpl.class);
- private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata>
commitOffsetMap = new ConcurrentHashMap<>();
- private final AtomicLong ackOffsets = new AtomicLong(0);
- private volatile boolean stopConsume = false;
- private String bootstrapServers;
- private KafkaConsumer<byte[], byte[]> consumer;
-
- public InLongKafkaFetcherImpl(InLongTopic inLongTopic, ClientContext
context) {
- super(inLongTopic, context);
- }
-
- @Override
- public boolean init(Object object) {
- String bootstrapServers = (String) object;
- try {
- createKafkaConsumer(bootstrapServers);
- if (consumer != null) {
- logger.info("start to subscribe topic:{}", new
Gson().toJson(inLongTopic));
- this.seeker = SeekerFactory.createKafkaSeeker(consumer,
inLongTopic);
-
consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
- new
AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker,
- commitOffsetMap, consumer));
- } else {
- logger.info("consumer is null");
- return false;
- }
- this.bootstrapServers = bootstrapServers;
- String threadName = String.format("sort_sdk_fetch_thread_%s_%s_%d",
- this.inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic(), this.hashCode());
- this.fetchThread = new Thread(new Fetcher(), threadName);
- logger.info("start to start thread:{}", threadName);
- this.fetchThread.start();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- @Override
- public void ack(String msgOffset) throws Exception {
- String[] offset = msgOffset.split(":");
- if (offset.length == 2) {
- TopicPartition topicPartition = new
TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0]));
- OffsetAndMetadata offsetAndMetadata = new
OffsetAndMetadata(Long.parseLong(offset[1]));
- commitOffsetMap.put(topicPartition, offsetAndMetadata);
- } else {
- throw new Exception("offset is illegal, the correct format is
int:long ,the error offset is:" + msgOffset);
- }
- }
-
- @Override
- public void pause() {
- this.stopConsume = true;
- }
-
- @Override
- public void resume() {
- this.stopConsume = false;
- }
-
- @Override
- public boolean close() {
- this.closed = true;
- try {
- if (fetchThread != null) {
- fetchThread.interrupt();
- }
- if (consumer != null) {
- consumer.close();
- }
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- logger.info("closed {}", inLongTopic);
- return true;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void stopConsume(boolean stopConsume) {
- this.stopConsume = stopConsume;
- }
-
- @Override
- public boolean isConsumeStop() {
- return this.stopConsume;
- }
-
- @Override
- public InLongTopic getInLongTopic() {
- return inLongTopic;
- }
-
- @Override
- public long getConsumedDataSize() {
- return 0;
- }
-
- @Override
- public long getAckedOffset() {
- return 0;
- }
-
- private void createKafkaConsumer(String bootstrapServers) {
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,
context.getConfig().getSortTaskId());
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
- context.getConfig().getKafkaSocketRecvBufferSize());
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- ConsumeStrategy offsetResetStrategy =
context.getConfig().getOffsetResetStrategy();
- if (offsetResetStrategy == ConsumeStrategy.lastest
- || offsetResetStrategy == ConsumeStrategy.lastest_absolutely) {
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- } else if (offsetResetStrategy == ConsumeStrategy.earliest
- || offsetResetStrategy == ConsumeStrategy.earliest_absolutely)
{
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- } else {
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
- }
- properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
- context.getConfig().getKafkaFetchSizeBytes());
- properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
- context.getConfig().getKafkaFetchWaitMs());
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
- RangeAssignor.class.getName());
- properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
- this.bootstrapServers = bootstrapServers;
- logger.info("start to create kafka consumer:{}", properties);
- this.consumer = new KafkaConsumer<>(properties);
- logger.info("end to create kafka consumer:{}", consumer);
- }
-
- public class Fetcher implements Runnable {
-
- private void commitKafkaOffset() {
- if (consumer != null && commitOffsetMap.size() > 0) {
- try {
- consumer.commitSync(commitOffsetMap);
- commitOffsetMap.clear();
- // TODO monitor commit succ
-
- } catch (Exception e) {
- // TODO monitor commit fail
- logger.error(e.getMessage(), e);
- }
- }
- }
-
- /**
- * put the received msg to onFinished method
- *
- * @param messageRecords {@link List < MessageRecord >}
- */
- private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
- long start = System.currentTimeMillis();
- try {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimes(1);
-
context.getConfig().getCallback().onFinishedBatch(messageRecords);
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1);
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackErrorTimes(1);
- logger.error(e.getMessage(), e);
- }
- }
-
- private String getOffset(int partitionId, long offset) {
- return partitionId + ":" + offset;
- }
-
- private Map<String, String> getMsgHeaders(Headers headers) {
- Map<String, String> headerMap = new HashMap<>();
- for (Header header : headers) {
- headerMap.put(header.key(), new String(header.value()));
- }
- return headerMap;
- }
-
- @Override
- public void run() {
- boolean hasPermit;
- while (true) {
- hasPermit = false;
- try {
- if (context.getConfig().isStopConsume() || stopConsume) {
- TimeUnit.MILLISECONDS.sleep(50);
- continue;
- }
-
- if (sleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- }
-
- context.acquireRequestPermit();
- hasPermit = true;
- // fetch from kafka
- fetchFromKafka();
- // commit
- commitKafkaOffset();
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchErrorTimes(1);
- logger.error(e.getMessage(), e);
- } finally {
- if (hasPermit) {
- context.releaseRequestPermit();
- }
- }
- }
- }
-
- private void fetchFromKafka() throws Exception {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addMsgCount(1).addFetchTimes(1);
-
- long startFetchTime = System.currentTimeMillis();
- ConsumerRecords<byte[], byte[]> records = consumer
-
.poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs()));
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addFetchTimeCost(System.currentTimeMillis() -
startFetchTime);
- if (null != records && !records.isEmpty()) {
-
- for (ConsumerRecord<byte[], byte[]> msg : records) {
- List<MessageRecord> msgs = new ArrayList<>();
- String offsetKey = getOffset(msg.partition(),
msg.offset());
- List<InLongMessage> inLongMessages = deserializer
- .deserialize(context, inLongTopic,
getMsgHeaders(msg.headers()), msg.value());
- inLongMessages = interceptor.intercept(inLongMessages);
- if (inLongMessages.isEmpty()) {
- ack(offsetKey);
- continue;
- }
-
- msgs.add(new MessageRecord(inLongTopic.getTopicKey(),
- inLongMessages,
- offsetKey, System.currentTimeMillis()));
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addConsumeSize(msg.value().length);
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(msgs.size());
- handleAndCallbackMsg(msgs);
- }
- sleepTime = 0L;
- } else {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addEmptyFetchTimes(1);
- emptyFetchTimes++;
- if (emptyFetchTimes >=
context.getConfig().getEmptyPollTimes()) {
- sleepTime = Math.min((sleepTime +=
context.getConfig().getEmptyPollSleepStepMs()),
- context.getConfig().getMaxEmptyPollSleepMs());
- emptyFetchTimes = 0;
- }
- }
- }
- }
-}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
deleted file mode 100644
index fddd9ec11..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
-
- private final Logger logger =
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
- private final ReentrantReadWriteLock mainLock = new
ReentrantReadWriteLock(true);
- private final ConcurrentHashMap<String, MessageId> offsetCache = new
ConcurrentHashMap<>();
- private Consumer<byte[]> consumer;
-
- public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
- ClientContext context) {
- super(inLongTopic, context);
- }
-
- @Override
- public void stopConsume(boolean stopConsume) {
- this.isStopConsume = stopConsume;
- }
-
- @Override
- public boolean isConsumeStop() {
- return isStopConsume;
- }
-
- @Override
- public InLongTopic getInLongTopic() {
- return inLongTopic;
- }
-
- @Override
- public long getConsumedDataSize() {
- return 0L;
- }
-
- @Override
- public long getAckedOffset() {
- return 0L;
- }
-
- private void ackSucc(String offset) {
- offsetCache.remove(offset);
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckSuccTimes(1L);
- }
-
- /**
- * ack Offset
- *
- * @param msgOffset String
- */
- @Override
- public void ack(String msgOffset) throws Exception {
- if (!StringUtils.isEmpty(msgOffset)) {
- try {
- if (consumer == null) {
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addAckFailTimes(1L);
- logger.error("consumer == null {}", inLongTopic);
- return;
- }
- MessageId messageId = offsetCache.get(msgOffset);
- if (messageId == null) {
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addAckFailTimes(1L);
- logger.error("messageId == null {}", inLongTopic);
- return;
- }
- consumer.acknowledgeAsync(messageId)
- .thenAccept(consumer -> ackSucc(msgOffset))
- .exceptionally(exception -> {
- logger.error("ack fail:{} {},error:{}",
- inLongTopic, msgOffset,
exception.getMessage(), exception);
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addAckFailTimes(1L);
- return null;
- });
- } catch (Exception e) {
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1L);
- logger.error(e.getMessage(), e);
- throw e;
- }
- }
- }
-
- /**
- * create Consumer and fetch thread
- *
- * @return boolean
- */
- @Override
- public boolean init(Object object) {
- PulsarClient pulsarClient = (PulsarClient) object;
- return createConsumer(pulsarClient);
- }
-
- private boolean createConsumer(PulsarClient client) {
- if (null == client) {
- return false;
- }
- try {
- consumer = client.newConsumer(Schema.BYTES)
- .topic(inLongTopic.getTopic())
- .subscriptionName(context.getConfig().getSortTaskId())
- .subscriptionType(SubscriptionType.Shared)
- .startMessageIdInclusive()
- .ackTimeout(context.getConfig().getAckTimeoutSec(),
TimeUnit.SECONDS)
-
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
- .subscribe();
-
- String threadName = "sort_sdk_fetch_thread_" +
StringUtil.formatDate(new Date());
- this.fetchThread = new Thread(new Fetcher(), threadName);
- this.fetchThread.start();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- /**
- * pause
- */
- @Override
- public void pause() {
- if (consumer != null) {
- consumer.pause();
- }
- }
-
- /**
- * resume
- */
- @Override
- public void resume() {
- if (consumer != null) {
- consumer.resume();
- }
- }
-
- /**
- * close
- *
- * @return true/false
- */
- @Override
- public boolean close() {
- mainLock.writeLock().lock();
- try {
- try {
- if (consumer != null) {
- consumer.close();
- }
- if (fetchThread != null) {
- fetchThread.interrupt();
- }
- } catch (PulsarClientException e) {
- e.printStackTrace();
- }
- logger.info("closed {}", inLongTopic);
- return true;
- } finally {
- this.closed = true;
- mainLock.writeLock().unlock();
- }
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- public class Fetcher implements Runnable {
-
- /**
- * put the received msg to onFinished method
- *
- * @param messageRecords {@link List}
- */
- private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
- long start = System.currentTimeMillis();
- try {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimes(1L);
-
context.getConfig().getCallback().onFinishedBatch(messageRecords);
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1L);
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackErrorTimes(1L);
- e.printStackTrace();
- }
- }
-
- private String getOffset(MessageId msgId) {
- return Base64.getEncoder().encodeToString(msgId.toByteArray());
- }
-
- @Override
- public void run() {
- boolean hasPermit;
- while (true) {
- hasPermit = false;
- try {
- if (context.getConfig().isStopConsume() || isStopConsume) {
- TimeUnit.MILLISECONDS.sleep(50);
- continue;
- }
-
- if (sleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- }
-
- context.acquireRequestPermit();
- hasPermit = true;
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(1L).addFetchTimes(1L);
-
- long startFetchTime = System.currentTimeMillis();
- Messages<byte[]> messages = consumer.batchReceive();
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchTimeCost(System.currentTimeMillis() -
startFetchTime);
- if (null != messages && messages.size() != 0) {
- for (Message<byte[]> msg : messages) {
- List<MessageRecord> msgs = new ArrayList<>();
- String offsetKey = getOffset(msg.getMessageId());
- offsetCache.put(offsetKey, msg.getMessageId());
-
- List<InLongMessage> inLongMessages = deserializer
- .deserialize(context, inLongTopic,
msg.getProperties(), msg.getData());
-
- msgs.add(new
MessageRecord(inLongTopic.getTopicKey(),
- inLongMessages,
- offsetKey, System.currentTimeMillis()));
- context.getStatManager()
-
.getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addConsumeSize(msg.getData().length);
- context.getStatManager()
-
.getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(msgs.size());
- handleAndCallbackMsg(msgs);
- }
- sleepTime = 0L;
- } else {
- context.getStatManager()
-
.getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addEmptyFetchTimes(1L);
- emptyFetchTimes++;
- if (emptyFetchTimes >=
context.getConfig().getEmptyPollTimes()) {
- sleepTime = Math.min((sleepTime +=
context.getConfig().getEmptyPollSleepStepMs()),
-
context.getConfig().getMaxEmptyPollSleepMs());
- emptyFetchTimes = 0;
- }
- }
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchErrorTimes(1L);
- logger.error(e.getMessage(), e);
- } finally {
- if (hasPermit) {
- context.releaseRequestPermit();
- }
- }
-
- if (closed) {
- break;
- }
- }
- }
- }
-}
\ No newline at end of file
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
deleted file mode 100644
index f4d2f3624..000000000
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.tube;
-
-import com.google.common.base.Splitter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.SysConstants;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
-import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class InLongTubeFetcherImpl extends InLongTopicFetcher {
-
- private static final Logger LOG =
LoggerFactory.getLogger(InLongTubeFetcherImpl.class);
- private PullMessageConsumer messageConsumer;
- private volatile Thread fetchThread;
-
- public InLongTubeFetcherImpl(InLongTopic inLongTopic, ClientContext
context) {
- super(inLongTopic, context);
- }
-
- @Override
- public boolean init(Object object) {
- TubeConsumerCreator tubeConsumerCreator = (TubeConsumerCreator) object;
- TubeClientConfig tubeClientConfig =
tubeConsumerCreator.getTubeClientConfig();
- try {
- ConsumerConfig consumerConfig = new
ConsumerConfig(tubeClientConfig.getMasterInfo(),
- context.getConfig().getSortTaskId());
-
- messageConsumer =
tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig);
- if (messageConsumer != null) {
- TreeSet<String> filters = null;
- if (inLongTopic.getProperties() != null &&
inLongTopic.getProperties().containsKey(
- SysConstants.TUBE_TOPIC_FILTER_KEY)) {
- String filterStr =
inLongTopic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
- String[] filterArray = filterStr.split(" ");
- filters = new TreeSet<>(Arrays.asList(filterArray));
- }
- messageConsumer.subscribe(inLongTopic.getTopic(), filters);
- messageConsumer.completeSubscribe();
-
- String threadName = "sort_sdk_fetch_thread_" +
StringUtil.formatDate(new Date());
- this.fetchThread = new Thread(new Fetcher(), threadName);
- this.fetchThread.start();
- } else {
- return false;
- }
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
- @Override
- public void ack(String msgOffset) throws Exception {
- if (!StringUtils.isEmpty(msgOffset)) {
- if (messageConsumer == null) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addAckFailTimes(1L);
- LOG.warn("consumer == null");
- return;
- }
-
- try {
- ConsumerResult consumerResult =
messageConsumer.confirmConsume(msgOffset, true);
- int errCode = consumerResult.getErrCode();
- if (TErrCodeConstants.SUCCESS != errCode) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addAckFailTimes(1L);
- } else {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addAckSuccTimes(1L);
- }
- } catch (Exception e) {
-
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1L);
- LOG.error(e.getMessage(), e);
- throw e;
- }
- }
- }
-
- @Override
- public void pause() {
- this.closed = true;
- }
-
- @Override
- public void resume() {
- this.closed = false;
- }
-
- @Override
- public boolean close() {
- try {
- if (fetchThread != null) {
- fetchThread.interrupt();
- }
- if (messageConsumer != null) {
- messageConsumer.shutdown();
- }
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- } finally {
- this.closed = true;
- }
- LOG.info("closed {}", inLongTopic);
- return true;
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void stopConsume(boolean stopConsume) {
- this.isStopConsume = stopConsume;
- }
-
- @Override
- public boolean isConsumeStop() {
- return isStopConsume;
- }
-
- @Override
- public InLongTopic getInLongTopic() {
- return inLongTopic;
- }
-
- @Override
- public long getConsumedDataSize() {
- return 0L;
- }
-
- @Override
- public long getAckedOffset() {
- return 0L;
- }
-
- public class Fetcher implements Runnable {
-
- /**
- * put the received msg to onFinished method
- *
- * @param messageRecord {@link MessageRecord}
- */
- private void handleAndCallbackMsg(MessageRecord messageRecord) {
- long start = System.currentTimeMillis();
- try {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimes(1L);
-
context.getConfig().getCallback().onFinishedBatch(Collections.singletonList(messageRecord));
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1L);
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackErrorTimes(1L);
- e.printStackTrace();
- }
- }
-
- /**
- * parseAttr from k1=v1&k2=v2 to kv map
- *
- * @param splitter {@link Splitter}
- * @param attr String
- * @param entrySplitterStr String
- * @return {@link Map}
- */
- private Map<String, String> parseAttr(Splitter splitter, String attr,
String entrySplitterStr) {
- Map<String, String> map = new HashMap<>();
- for (String s : splitter.split(attr)) {
- int idx = s.indexOf(entrySplitterStr);
- String k = s;
- String v = null;
- if (idx > 0) {
- k = s.substring(0, idx);
- v = s.substring(idx + 1);
- }
- map.put(k, v);
- }
- return map;
- }
-
- private Map<String, String> getAttributeMap(String attribute) {
- final Splitter splitter = Splitter.on("&");
- return parseAttr(splitter, attribute, "=");
- }
-
- @Override
- public void run() {
- boolean hasPermit;
- while (true) {
- hasPermit = false;
- try {
- if (context.getConfig().isStopConsume() || isStopConsume) {
- TimeUnit.MILLISECONDS.sleep(50L);
- continue;
- }
-
- if (sleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(sleepTime);
- }
-
- context.acquireRequestPermit();
- hasPermit = true;
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(1L).addFetchTimes(1L);
-
- long startFetchTime = System.currentTimeMillis();
- ConsumerResult message = messageConsumer.getMessage();
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchTimeCost(System.currentTimeMillis() -
startFetchTime);
- if (null != message && TErrCodeConstants.SUCCESS ==
message.getErrCode()) {
- for (Message msg : message.getMessageList()) {
- List<InLongMessage> msgs = new ArrayList<>();
- List<InLongMessage> deserialize = deserializer
- .deserialize(context, inLongTopic,
getAttributeMap(msg.getAttribute()),
- msg.getData());
- deserialize = interceptor.intercept(deserialize);
- if (deserialize.isEmpty()) {
- continue;
- }
- msgs.addAll(deserialize);
- context.getStatManager()
-
.getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-
.addMsgCount(deserialize.size()).addConsumeSize(msg.getData().length);
- handleAndCallbackMsg(new
MessageRecord(inLongTopic.getTopicKey(), msgs,
- message.getConfirmContext(),
System.currentTimeMillis()));
- }
-
- sleepTime = 0L;
- } else {
- context.getStatManager()
-
.getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addEmptyFetchTimes(1L);
- emptyFetchTimes++;
- if (emptyFetchTimes >=
context.getConfig().getEmptyPollTimes()) {
- sleepTime = Math.min((sleepTime +=
context.getConfig().getEmptyPollSleepStepMs()),
-
context.getConfig().getMaxEmptyPollSleepMs());
- emptyFetchTimes = 0;
- }
- }
- } catch (Exception e) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
-
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchErrorTimes(1L);
- LOG.error(e.getMessage(), e);
- } finally {
- if (hasPermit) {
- context.releaseRequestPermit();
- }
- }
-
- if (closed) {
- break;
- }
- }
- }
- }
-}
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
index 363a8244f..52011b3c6 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -45,7 +45,7 @@ public class InlongTopicManagerImplTest {
private InLongTopic inLongTopic;
private ClientContext clientContext;
private QueryConsumeConfig queryConsumeConfig;
- private InlongTopicManager inLongTopicManager;
+ private InlongSingleTopicManager inLongTopicManager;
{
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
@@ -66,40 +66,40 @@ public class InlongTopicManagerImplTest {
when(sortClientConfig.getSortTaskId()).thenReturn("test");
when(sortClientConfig.getUpdateMetaDataIntervalSec()).thenReturn(60);
queryConsumeConfig = PowerMockito.mock(QueryConsumeConfigImpl.class);
- inLongTopicManager = new InlongTopicManagerImpl(clientContext,
queryConsumeConfig);
+ inLongTopicManager = new InlongSingleTopicManager(clientContext,
queryConsumeConfig);
}
@Test
public void testAddFetcher() {
- InlongTopicManager inLongTopicManager = new
InlongTopicManagerImpl(clientContext, queryConsumeConfig);
+ InlongSingleTopicManager inLongTopicManager = new
InlongSingleTopicManager(clientContext, queryConsumeConfig);
- InLongTopicFetcher inLongTopicFetcher =
inLongTopicManager.addFetcher(inLongTopic);
+ TopicFetcher inLongTopicFetcher =
inLongTopicManager.addTopic(inLongTopic);
Assert.assertNull(inLongTopicFetcher);
}
@Test
public void testRemoveFetcher() {
- InLongTopicFetcher inLongTopicFetcher =
inLongTopicManager.removeFetcher(inLongTopic, true);
+ TopicFetcher inLongTopicFetcher =
inLongTopicManager.removeTopic(inLongTopic, true);
Assert.assertNull(inLongTopicFetcher);
- ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new
ConcurrentHashMap<>();
- InLongTopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(InLongTopicFetcher.class);
+ ConcurrentHashMap<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
+ TopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(TopicFetcher.class);
fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
- inLongTopicFetcher = inLongTopicManager.removeFetcher(inLongTopic,
true);
+ inLongTopicFetcher = inLongTopicManager.removeTopic(inLongTopic, true);
Assert.assertNotNull(inLongTopicFetcher);
}
@Test
public void testGetFetcher() {
- InLongTopicFetcher fetcher =
inLongTopicManager.getFetcher(inLongTopic.getTopicKey());
+ TopicFetcher fetcher =
inLongTopicManager.getFetcher(inLongTopic.getTopicKey());
Assert.assertNull(fetcher);
- ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new
ConcurrentHashMap<>();
- InLongTopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(InLongTopicFetcher.class);
+ ConcurrentHashMap<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
+ TopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(TopicFetcher.class);
fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
@@ -114,8 +114,8 @@ public class InlongTopicManagerImplTest {
Set<String> managedInLongTopics =
inLongTopicManager.getManagedInLongTopics();
Assert.assertEquals(0, managedInLongTopics.size());
- ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new
ConcurrentHashMap<>();
- InLongTopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(InLongTopicFetcher.class);
+ ConcurrentHashMap<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
+ TopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(TopicFetcher.class);
fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
managedInLongTopics = inLongTopicManager.getManagedInLongTopics();
@@ -125,11 +125,11 @@ public class InlongTopicManagerImplTest {
@Test
public void testGetAllFetchers() {
- Collection<InLongTopicFetcher> allFetchers =
inLongTopicManager.getAllFetchers();
+ Collection<TopicFetcher> allFetchers =
inLongTopicManager.getAllFetchers();
Assert.assertEquals(0, allFetchers.size());
- ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new
ConcurrentHashMap<>();
- InLongTopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(InLongTopicFetcher.class);
+ ConcurrentHashMap<String, TopicFetcher> fetchers = new
ConcurrentHashMap<>();
+ TopicFetcher inLongTopicFetcherRmMock =
PowerMockito.mock(TopicFetcher.class);
fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
allFetchers = inLongTopicManager.getAllFetchers();
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
index 815f1047c..4fc965625 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
@@ -17,9 +17,6 @@
package org.apache.inlong.sdk.sort.impl.decode;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.when;
-
import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.List;
@@ -29,13 +26,10 @@ import
org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
import org.apache.inlong.sdk.sort.util.Utils;
import org.junit.Assert;
import org.junit.Test;
@@ -52,16 +46,12 @@ public class MessageDeserializerTest {
private InLongTopic inLongTopic;
private String testData;
private MessageObjs messageObjs;
- private SortClientConfig sortClientConfig;
- private StatManager statManager;
private void setUp() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
messageDeserializer = new MessageDeserializer();
headers = new HashMap<>();
context = PowerMockito.mock(ClientContextImpl.class);
- sortClientConfig = PowerMockito.mock(SortClientConfig.class);
- statManager = PowerMockito.mock(StatManager.class);
inLongTopic = new InLongTopic();
inLongTopic.setTopic("testTopic");
@@ -69,13 +59,6 @@ public class MessageDeserializerTest {
inLongTopic.setInLongCluster(cacheZoneCluster);
inLongTopic.setProperties(new HashMap<>());
- when(context.getConfig()).thenReturn(sortClientConfig);
- when(context.getStatManager()).thenReturn(statManager);
- SortClientStateCounter sortClientStateCounter = new
SortClientStateCounter("sortTaskId",
- cacheZoneCluster.getClusterId(),
- inLongTopic.getTopic(), 0);
- when(statManager.getStatistics(anyString(), anyString(),
anyString())).thenReturn(sortClientStateCounter);
- when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
}
@Test
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
index 8d3a4b5e3..88b1ed7b2 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
@@ -17,16 +17,14 @@
package org.apache.inlong.sdk.sort.impl.kafka;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.when;
-
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaSingleTopicFetcher;
import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
+import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -45,8 +43,7 @@ public class InLongKafkaFetcherImplTest {
private ClientContext clientContext;
private InLongTopic inLongTopic;
- private SortClientConfig sortClientConfig;
- private StatManager statManager;
+ private static final String TEST_BOOTSTRAP = "testBootstrap";
/**
* setUp
@@ -65,34 +62,26 @@ public class InLongKafkaFetcherImplTest {
inLongTopic.setInLongCluster(cacheZoneCluster);
clientContext = PowerMockito.mock(ClientContextImpl.class);
- sortClientConfig = PowerMockito.mock(SortClientConfig.class);
- statManager = PowerMockito.mock(StatManager.class);
-
- when(clientContext.getConfig()).thenReturn(sortClientConfig);
- when(clientContext.getStatManager()).thenReturn(statManager);
- SortClientStateCounter sortClientStateCounter = new
SortClientStateCounter("sortTaskId",
- cacheZoneCluster.getClusterId(),
- inLongTopic.getTopic(), 0);
- when(statManager.getStatistics(anyString(), anyString(),
anyString())).thenReturn(sortClientStateCounter);
- when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
-
}
@Test
public void pause() {
- InLongKafkaFetcherImpl inLongTopicFetcher = new
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(),
TEST_BOOTSTRAP);
inLongTopicFetcher.pause();
}
@Test
public void resume() {
- InLongKafkaFetcherImpl inLongTopicFetcher = new
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(),
TEST_BOOTSTRAP);
inLongTopicFetcher.resume();
}
@Test
public void close() {
- InLongKafkaFetcherImpl inLongTopicFetcher = new
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(),
TEST_BOOTSTRAP);
boolean close = inLongTopicFetcher.close();
Assert.assertTrue(close);
}
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index 935387f9d..202e8b468 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -27,13 +27,14 @@ import static org.powermock.api.mockito.PowerMockito.when;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarSingleTopicFetcher;
import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
+import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
@@ -56,7 +57,6 @@ public class InLongPulsarFetcherImplTest {
private ClientContext clientContext;
private InLongTopic inLongTopic;
private SortClientConfig sortClientConfig;
- private StatManager statManager;
/**
* setUp
@@ -76,52 +76,35 @@ public class InLongPulsarFetcherImplTest {
clientContext = PowerMockito.mock(ClientContextImpl.class);
sortClientConfig = PowerMockito.mock(SortClientConfig.class);
- statManager = PowerMockito.mock(StatManager.class);
when(clientContext.getConfig()).thenReturn(sortClientConfig);
- when(clientContext.getStatManager()).thenReturn(statManager);
- SortClientStateCounter sortClientStateCounter = new
SortClientStateCounter("sortTaskId",
- cacheZoneCluster.getClusterId(),
- inLongTopic.getTopic(), 0);
- when(statManager.getStatistics(anyString(), anyString(),
anyString())).thenReturn(sortClientStateCounter);
when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
}
@Test
public void stopConsume() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
- boolean consumeStop = inLongTopicFetcher.isConsumeStop();
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
+ boolean consumeStop = inLongTopicFetcher.isStopConsume();
Assert.assertFalse(consumeStop);
- inLongTopicFetcher.stopConsume(true);
- consumeStop = inLongTopicFetcher.isConsumeStop();
+ inLongTopicFetcher.setStopConsume(true);
+ consumeStop = inLongTopicFetcher.isStopConsume();
Assert.assertTrue(consumeStop);
}
@Test
public void getInLongTopic() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
- InLongTopic inLongTopic = inLongTopicFetcher.getInLongTopic();
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
+ InLongTopic inLongTopic = inLongTopicFetcher.getTopics().get(0);
Assert.assertEquals(inLongTopic.getInLongCluster(),
inLongTopic.getInLongCluster());
}
- @Test
- public void getConsumedDataSize() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
- long consumedDataSize = inLongTopicFetcher.getConsumedDataSize();
- Assert.assertEquals(0L, consumedDataSize);
- }
-
- @Test
- public void getAckedOffset() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
- long ackedOffset = inLongTopicFetcher.getAckedOffset();
- Assert.assertEquals(0L, ackedOffset);
- }
-
@Test
public void ack() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
MessageId messageId = PowerMockito.mock(MessageId.class);
ConcurrentHashMap<String, MessageId> offsetCache = new
ConcurrentHashMap<>();
offsetCache.put("test", messageId);
@@ -137,10 +120,11 @@ public class InLongPulsarFetcherImplTest {
@Test
public void init() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
PulsarClient pulsarClient = PowerMockito.mock(PulsarClient.class);
ConsumerBuilder consumerBuilder =
PowerMockito.mock(ConsumerBuilder.class);
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(),
pulsarClient);
try {
when(pulsarClient.newConsumer(any())).thenReturn(consumerBuilder);
when(consumerBuilder.topic(anyString())).thenReturn(consumerBuilder);
@@ -156,7 +140,7 @@ public class InLongPulsarFetcherImplTest {
Consumer consumer = PowerMockito.mock(Consumer.class);
when(consumerBuilder.subscribe()).thenReturn(consumer);
doNothing().when(consumer).close();
- boolean init = inLongTopicFetcher.init(pulsarClient);
+ boolean init = inLongTopicFetcher.init();
inLongTopicFetcher.close();
Assert.assertTrue(init);
} catch (Exception e) {
@@ -166,19 +150,22 @@ public class InLongPulsarFetcherImplTest {
@Test
public void pause() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
inLongTopicFetcher.pause();
}
@Test
public void resume() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
inLongTopicFetcher.resume();
}
@Test
public void close() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+ new MsgTimeInterceptor(), new MessageDeserializer(), null);
boolean close = inLongTopicFetcher.close();
Assert.assertTrue(close);
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
index 3e35f43f6..dde55f53b 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -17,17 +17,15 @@
package org.apache.inlong.sdk.sort.impl.tube;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeSingleTopicFetcher;
import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -40,7 +38,6 @@ public class InLongTubeFetcherImplTest {
private ClientContext clientContext;
private InLongTopic inLongTopic;
private SortClientConfig sortClientConfig;
- private StatManager statManager;
/**
* setUp
@@ -60,33 +57,27 @@ public class InLongTubeFetcherImplTest {
clientContext = PowerMockito.mock(ClientContextImpl.class);
sortClientConfig = PowerMockito.mock(SortClientConfig.class);
- statManager = PowerMockito.mock(StatManager.class);
when(clientContext.getConfig()).thenReturn(sortClientConfig);
- when(clientContext.getStatManager()).thenReturn(statManager);
- SortClientStateCounter sortClientStateCounter = new
SortClientStateCounter("sortTaskId",
- cacheZoneCluster.getClusterId(),
- inLongTopic.getTopic(), 0);
- when(statManager.getStatistics(anyString(), anyString(),
anyString())).thenReturn(sortClientStateCounter);
when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
}
@Test
public void pause() {
- InLongTubeFetcherImpl inLongTopicFetcher = new
InLongTubeFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
inLongTopicFetcher.pause();
}
@Test
public void resume() {
- InLongTubeFetcherImpl inLongTopicFetcher = new
InLongTubeFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
inLongTopicFetcher.resume();
}
@Test
public void close() {
- InLongTopicFetcher inLongTopicFetcher = new
InLongTubeFetcherImpl(inLongTopic, clientContext);
+ TopicFetcher inLongTopicFetcher = new
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
boolean close = inLongTopicFetcher.close();
Assert.assertTrue(close);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index f00a1a03b..b74ad9c6b 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -196,14 +196,14 @@ public final class SortSdkSource extends AbstractSource
if
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
LOG.info("Create sort sdk client in file way:{}", configType);
ClassResourceQueryConsumeConfig queryConfig = new
ClassResourceQueryConsumeConfig();
- client = SortClientFactory.createSortClientV2(clientConfig,
+ client = SortClientFactory.createSortClient(clientConfig,
queryConfig,
new MetricReporterImpl(clientConfig),
new ManagerReportHandlerImpl());
} else if
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
LOG.info("Create sort sdk client in manager way:{}",
configType);
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
- client = SortClientFactory.createSortClientV2(clientConfig);
+ client = SortClientFactory.createSortClient(clientConfig);
} else {
LOG.info("Create sort sdk client in custom way:{}",
configType);
// user-defined
@@ -218,7 +218,7 @@ public final class SortSdkSource extends AbstractSource
return null;
}
// if it specifies the type of QueryConsumeConfig.
- client = SortClientFactory.createSortClientV2(clientConfig,
+ client = SortClientFactory.createSortClient(clientConfig,
(QueryConsumeConfig) loaderObject,
new MetricReporterImpl(clientConfig),
new ManagerReportHandlerImpl());