This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f340ce5  [INLONG-1931][Feature][sort-sdk] sort-sdk support consume 
tube events from cachezone (#2186)
f340ce5 is described below

commit f340ce59ff10bf05df47a75be7e4855ea88cac1b
Author: wardli <[email protected]>
AuthorDate: Thu Jan 20 20:19:14 2022 +0800

    [INLONG-1931][Feature][sort-sdk] sort-sdk support consume tube events from 
cachezone (#2186)
---
 inlong-sdk/sort-sdk/pom.xml                        |   6 +
 .../inlong/sdk/sort/api/InLongTopicFetcher.java    |   5 +
 .../apache/inlong/sdk/sort/api/SysConstants.java   |  26 ++
 .../inlong/sdk/sort/entity/CacheZoneCluster.java   |   5 +
 .../apache/inlong/sdk/sort/entity/InLongTopic.java |  17 +-
 .../sdk/sort/impl/InLongTopicManagerImpl.java      | 167 +++++++++---
 .../sort/impl/pulsar/InLongPulsarFetcherImpl.java  |  44 ++-
 .../sdk/sort/impl/tube/InLongTubeFetcherImpl.java  | 300 +++++++++++++++++++++
 .../sdk/sort/impl/tube/TubeConsumerCreater.java    |  59 ++++
 9 files changed, 566 insertions(+), 63 deletions(-)

diff --git a/inlong-sdk/sort-sdk/pom.xml b/inlong-sdk/sort-sdk/pom.xml
index cb42251..42b0336 100644
--- a/inlong-sdk/sort-sdk/pom.xml
+++ b/inlong-sdk/sort-sdk/pom.xml
@@ -118,6 +118,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
index e2ef03e..b113a82 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
@@ -23,6 +23,11 @@ public abstract class InLongTopicFetcher {
 
     protected InLongTopic inLongTopic;
     protected ClientContext context;
+    protected volatile boolean closed = false;
+    protected volatile boolean isStopConsume = false;
+    // use for empty topic to sleep
+    protected long sleepTime = 0L;
+    protected int emptyFetchTimes = 0;
 
     public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
         this.inLongTopic = inLongTopic;
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
new file mode 100644
index 0000000..79635bc
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.api;
+
+public class SysConstants {
+
+    public static final String TUBE_TOPIC_FILTER_KEY = "tube_topic_filter_key";
+
+}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
index 9616cf4..4bac733 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
@@ -71,4 +71,9 @@ public class CacheZoneCluster {
     public int hashCode() {
         return Objects.hash(clusterId);
     }
+
+    @Override
+    public String toString() {
+        return "CacheZoneCluster>>>" + clusterId + "|" + bootstraps + "|" + 
token;
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index 1ae5867..384f952 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sdk.sort.entity;
 
+import java.util.Map;
 import java.util.Objects;
 
 public class InLongTopic {
@@ -26,6 +27,7 @@ public class InLongTopic {
     private int partitionId;
     //pulsar,kafka,tube
     private String topicType;
+    private Map<String, Object> properties;
 
     public String getTopic() {
         return topic;
@@ -59,6 +61,14 @@ public class InLongTopic {
         this.topicType = topicType;
     }
 
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -84,11 +94,6 @@ public class InLongTopic {
 
     @Override
     public String toString() {
-        return "InLongTopic{"
-                + "topic='" + topic
-                + ", inLongCluster=" + cacheZoneCluster
-                + ", partitionId=" + partitionId
-                + ", topicType='" + topicType
-                + '}';
+        return "InLongTopic>>>" + topic + "|" + "|" + partitionId + "|" + 
topicType + "|" + cacheZoneCluster;
     }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index cfbd56c..2f310e5 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,8 +36,13 @@ import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.tube.TubeConsumerCreater;
 import org.apache.inlong.sdk.sort.util.PeriodicTask;
 import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.slf4j.Logger;
@@ -49,6 +55,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
     private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers
             = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, TubeConsumerCreater> tubeFactories 
= new ConcurrentHashMap<>();
 
     private final PeriodicTask updateMetaDataWorker;
     private volatile List<String> toBeSelectFetchers = new ArrayList<>();
@@ -69,26 +76,25 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
 
     @Override
     public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) {
+
         try {
             InLongTopicFetcher result = 
fetchers.get(inLongTopic.getTopicKey());
             if (result == null) {
-                InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, context);
+                // create fetcher (pulsar,tube,kafka)
+                InLongTopicFetcher inLongTopicFetcher = 
createInLongTopicFetcher(inLongTopic);
                 InLongTopicFetcher preValue = 
fetchers.putIfAbsent(inLongTopic.getTopicKey(), inLongTopicFetcher);
                 logger.info("addFetcher :{}", inLongTopic.getTopicKey());
                 if (preValue != null) {
                     result = preValue;
-                    inLongTopicFetcher.close();
+                    if (inLongTopicFetcher != null) {
+                        inLongTopicFetcher.close();
+                    }
                     logger.info("addFetcher create same fetcher {}", 
inLongTopic);
                 } else {
                     result = inLongTopicFetcher;
-                    PulsarClient pulsarClient = 
pulsarClients.get(inLongTopic.getInLongCluster().getClusterId());
-                    if (null == pulsarClient) {
-                        logger.error("pulsar client is null:{}", 
inLongTopic.getInLongCluster().getClusterId());
-                        return null;
-                    }
-
-                    if (!result.init(pulsarClient)) {
-                        logger.info("addFetcher init fail:{}", 
inLongTopic.getTopicKey());
+                    if (result != null
+                            && 
!result.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()))) 
{
+                        logger.info("addFetcher init fail {}", 
inLongTopic.getTopicKey());
                         result.close();
                         result = null;
                     }
@@ -100,6 +106,25 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
         }
     }
 
+    /**
+     * create fetcher (pulsar,tube,kafka)
+     *
+     * @param inLongTopic {@link InLongTopic}
+     * @return {@link InLongTopicFetcher}
+     */
+    private InLongTopicFetcher createInLongTopicFetcher(InLongTopic 
inLongTopic) {
+        if 
(InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+            logger.info("the topic is pulsar {}", inLongTopic);
+            return new InLongPulsarFetcherImpl(inLongTopic, context);
+        } else if 
(InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+            logger.info("the topic is tube {}", inLongTopic);
+            return new InLongTubeFetcherImpl(inLongTopic, context);
+        } else {
+            logger.error("topic type not support " + 
inLongTopic.getTopicType());
+            return null;
+        }
+    }
+
     @Override
     public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean 
closeFetcher) {
         InLongTopicFetcher result = fetchers.remove(inLongTopic.getTopicKey());
@@ -142,7 +167,6 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
 
     @Override
     public boolean clean() {
-        boolean result = false;
         String sortTaskId = context.getConfig().getSortTaskId();
         try {
             logger.info("start close {}", sortTaskId);
@@ -152,13 +176,15 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
             }
 
             closeFetcher();
-            result = true;
-            logger.info("close finished {} {}", sortTaskId, result);
+            closePulsarClient();
+            closeTubeSessionFactory();
+            logger.info("close finished {}", sortTaskId);
+            return true;
         } catch (Throwable th) {
-            logger.info("close error {} {}", sortTaskId, th);
+            logger.error("close error " + sortTaskId, th);
 
         }
-        return result;
+        return false;
     }
 
     private void closeAllFetcher() {
@@ -182,6 +208,36 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
         }
     }
 
+    private void closePulsarClient() {
+        for (Map.Entry<String, PulsarClient> entry : pulsarClients.entrySet()) 
{
+            PulsarClient pulsarClient = entry.getValue();
+            String key = entry.getKey();
+            try {
+                if (pulsarClient != null) {
+                    pulsarClient.close();
+                }
+            } catch (Exception e) {
+                logger.error("close PulsarClient" + key + " error.", e);
+            }
+        }
+        pulsarClients.clear();
+    }
+
+    private void closeTubeSessionFactory() {
+        for (Map.Entry<String, TubeConsumerCreater> entry : 
tubeFactories.entrySet()) {
+            MessageSessionFactory tubeMessageSessionFactory = 
entry.getValue().getMessageSessionFactory();
+            String key = entry.getKey();
+            try {
+                if (tubeMessageSessionFactory != null) {
+                    tubeMessageSessionFactory.shutdown();
+                }
+            } catch (Exception e) {
+                logger.error("close MessageSessionFactory" + key + " error.", 
e);
+            }
+        }
+        tubeFactories.clear();
+    }
+
     private List<String> getNewTopics(List<InLongTopic> 
newSubscribedInLongTopics) {
         if (newSubscribedInLongTopics != null && 
newSubscribedInLongTopics.size() > 0) {
             List<String> newTopics = new ArrayList<>();
@@ -195,6 +251,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
 
     private void handleCurrentConsumeConfig(List<InLongTopic> 
currentConsumeConfig) {
         if (null == currentConsumeConfig) {
+            logger.warn("List<InLongTopic> currentConsumeConfig is null");
             return;
         }
 
@@ -221,11 +278,11 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
     /**
      * offline inlong topic which not belong the sortTaskId
      *
-     * @param oldInLongTopics List
+     * @param oldInLongTopics {@link List<String>}
      */
     private void offlineRmovedTopic(List<String> oldInLongTopics) {
         for (String fetchKey : oldInLongTopics) {
-            logger.info("offlineRmovedTopic :{}", fetchKey);
+            logger.info("offlineRmovedTopic {}", fetchKey);
             InLongTopic inLongTopic = fetchers.get(fetchKey).getInLongTopic();
             InLongTopicFetcher inLongTopicFetcher = 
fetchers.getOrDefault(fetchKey, null);
             if (inLongTopicFetcher != null) {
@@ -254,6 +311,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
     private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics, 
List<String> reallyNewTopic) {
         for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
             if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
+                
logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
                 continue;
             }
             onlineTopic(inLongTopic);
@@ -262,10 +320,13 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
 
     private void onlineTopic(InLongTopic inLongTopic) {
         if 
(InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+            logger.info("the topic is pulsar:{}", inLongTopic);
             onlinePulsarTopic(inLongTopic);
         } else if 
(InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+            logger.info("the topic is kafka:{}", inLongTopic);
             onlineKafkaTopic(inLongTopic);
         } else if 
(InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+            logger.info("the topic is tube:{}", inLongTopic);
             onlineTubeTopic(inLongTopic);
         } else {
             logger.error("topic type:{} not support", 
inLongTopic.getTopicType());
@@ -274,25 +335,10 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
 
     private void onlinePulsarTopic(InLongTopic inLongTopic) {
         if (!checkAndCreateNewPulsarClient(inLongTopic)) {
+            logger.error("checkAndCreateNewPulsarClient error:{}", 
inLongTopic);
             return;
         }
-
-        if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
-            logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
-            if (context != null && context.getStatManager() != null) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addTopicOnlineTimes(1);
-                InLongTopicFetcher fetcher = addFetcher(inLongTopic);
-                if (fetcher == null) {
-                    fetchers.remove(inLongTopic.getTopicKey());
-                    logger.error("add fetcher error:{}", 
inLongTopic.getTopicKey());
-                }
-            } else {
-                logger.error("context == null or context.getStatManager() == 
null");
-            }
-        }
+        createNewFetcher(inLongTopic);
     }
 
     private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
@@ -308,6 +354,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
                             inLongTopic.getInLongCluster().getBootstraps(),
                             inLongTopic.getInLongCluster().getToken());
                 } catch (Exception e) {
+                    logger.error("create pulsar client error {}", inLongTopic);
                     logger.error(e.getMessage(), e);
                     return false;
                 }
@@ -316,6 +363,34 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
                 return false;
             }
         }
+        logger.info("create pulsar client true {}", inLongTopic);
+        return true;
+    }
+
+    private boolean checkAndCreateNewTubeSessionFactory(InLongTopic 
inLongTopic) {
+        if 
(!tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
+            if (inLongTopic.getInLongCluster().getBootstraps() != null) {
+                try {
+                    //create MessageSessionFactory
+                    TubeClientConfig tubeConfig = new 
TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
+                    MessageSessionFactory messageSessionFactory = new 
TubeSingleSessionFactory(tubeConfig);
+                    TubeConsumerCreater tubeConsumerCreater = new 
TubeConsumerCreater(messageSessionFactory,
+                            tubeConfig);
+                    
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), 
tubeConsumerCreater);
+                    logger.info("create tube client succ {} {} {}", 
inLongTopic.getInLongCluster().getClusterId(),
+                            inLongTopic.getInLongCluster().getBootstraps(),
+                            inLongTopic.getInLongCluster().getToken());
+                } catch (Exception e) {
+                    logger.error("create tube client error {}", inLongTopic);
+                    logger.error(e.getMessage(), e);
+                    return false;
+                }
+            } else {
+                logger.info("bootstrap is null {}", 
inLongTopic.getInLongCluster());
+                return false;
+            }
+        }
+        logger.info("create pulsar client true {}", inLongTopic);
         return true;
     }
 
@@ -323,6 +398,30 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
     }
 
     private void onlineTubeTopic(InLongTopic inLongTopic) {
+        if (!checkAndCreateNewTubeSessionFactory(inLongTopic)) {
+            logger.error("checkAndCreateNewPulsarClient error:{}", 
inLongTopic);
+            return;
+        }
+        createNewFetcher(inLongTopic);
+    }
+
+    private void createNewFetcher(InLongTopic inLongTopic) {
+        if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
+            logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
+            if (context != null && context.getStatManager() != null) {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addTopicOnlineTimes(1);
+                InLongTopicFetcher fetcher = addFetcher(inLongTopic);
+                if (fetcher == null) {
+                    fetchers.remove(inLongTopic.getTopicKey());
+                    logger.error("add fetcher error:{}", 
inLongTopic.getTopicKey());
+                }
+            } else {
+                logger.error("context == null or context.getStatManager() == 
null");
+            }
+        }
     }
 
     private class UpdateMetaDataThread extends PeriodicTask {
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index ad463e5..9803936 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -48,12 +48,10 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
     private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
     private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
     private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
-    private volatile boolean closed = false;
+
     private Consumer<byte[]> consumer;
-    private volatile boolean stopConsume = false;
+
     private volatile Thread fetchThread;
-    private long sleepTime = 0L;
-    private int emptyPollTimes = 0;
 
     public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
             ClientContext context) {
@@ -62,12 +60,12 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
 
     @Override
     public void stopConsume(boolean stopConsume) {
-        this.stopConsume = stopConsume;
+        this.isStopConsume = stopConsume;
     }
 
     @Override
     public boolean isConsumeStop() {
-        return stopConsume;
+        return isStopConsume;
     }
 
     @Override
@@ -77,18 +75,18 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
 
     @Override
     public long getConsumedDataSize() {
-        return 0;
+        return 0L;
     }
 
     @Override
     public long getAckedOffset() {
-        return 0;
+        return 0L;
     }
 
     private void ackSucc(String offset) {
         offsetCache.remove(offset);
         
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1);
+                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1L);
     }
 
     /**
@@ -103,7 +101,7 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                 if (consumer == null) {
                     
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                             inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                            .addAckFailTimes(1);
+                            .addAckFailTimes(1L);
                     logger.error("consumer == null");
                     return;
                 }
@@ -111,7 +109,7 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                 if (messageId == null) {
                     
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                             inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                            .addAckFailTimes(1);
+                            .addAckFailTimes(1L);
                     logger.error("messageId == null");
                     return;
                 }
@@ -121,12 +119,12 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                             logger.error("ack fail:{}", msgOffset);
                             
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                                     
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    .addAckFailTimes(1);
+                                    .addAckFailTimes(1L);
                             return null;
                         });
             } catch (Exception e) {
                 
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1);
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1L);
                 logger.error(e.getMessage(), e);
                 throw e;
             }
@@ -240,17 +238,17 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                 context.getStatManager()
                         .getStatistics(context.getConfig().getSortTaskId(),
                                 inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimes(1);
+                        .addCallbackTimes(1L);
                 
context.getConfig().getCallback().onFinishedBatch(messageRecords);
                 context.getStatManager()
                         .getStatistics(context.getConfig().getSortTaskId(),
                                 inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1);
+                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1L);
             } catch (Exception e) {
                 context.getStatManager()
                         .getStatistics(context.getConfig().getSortTaskId(),
                                 inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackErrorTimes(1);
+                        .addCallbackErrorTimes(1L);
                 e.printStackTrace();
             }
         }
@@ -265,7 +263,7 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
             while (true) {
                 hasPermit = false;
                 try {
-                    if (context.getConfig().isStopConsume() || stopConsume) {
+                    if (context.getConfig().isStopConsume() || isStopConsume) {
                         TimeUnit.MILLISECONDS.sleep(50);
                         continue;
                     }
@@ -279,7 +277,7 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                     context.getStatManager()
                             .getStatistics(context.getConfig().getSortTaskId(),
                                     
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addMsgCount(1).addFetchTimes(1);
+                            .addMsgCount(1L).addFetchTimes(1L);
 
                     long startFetchTime = System.currentTimeMillis();
                     Messages<byte[]> messages = consumer.batchReceive();
@@ -311,19 +309,19 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                         context.getStatManager()
                                 
.getStatistics(context.getConfig().getSortTaskId(),
                                         
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                .addEmptyFetchTimes(1);
-                        emptyPollTimes++;
-                        if (emptyPollTimes >= 
context.getConfig().getEmptyPollTimes()) {
+                                .addEmptyFetchTimes(1L);
+                        emptyFetchTimes++;
+                        if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
                             sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
                                     
context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyPollTimes = 0;
+                            emptyFetchTimes = 0;
                         }
                     }
                 } catch (Exception e) {
                     context.getStatManager()
                             .getStatistics(context.getConfig().getSortTaskId(),
                                     
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchErrorTimes(1);
+                            .addFetchErrorTimes(1L);
                     logger.error(e.getMessage(), e);
                 } finally {
                     if (hasPermit) {
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
new file mode 100644
index 0000000..63abb3f
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import com.google.common.base.Splitter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.api.SysConstants;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongTubeFetcherImpl extends InLongTopicFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InLongTubeFetcherImpl.class);
+    private PullMessageConsumer messageConsumer;
+    private volatile Thread fetchThread;
+
+    public InLongTubeFetcherImpl(InLongTopic inLongTopic, ClientContext 
context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public boolean init(Object object) {
+        TubeConsumerCreater tubeConsumerCreater = (TubeConsumerCreater) object;
+        TubeClientConfig tubeClientConfig = 
tubeConsumerCreater.getTubeClientConfig();
+        try {
+            ConsumerConfig consumerConfig = new 
ConsumerConfig(tubeClientConfig.getMasterInfo(),
+                    context.getConfig().getSortTaskId());
+
+            messageConsumer = 
tubeConsumerCreater.getMessageSessionFactory().createPullConsumer(consumerConfig);
+            if (messageConsumer != null) {
+                TreeSet<String> filters = null;
+                if (inLongTopic.getProperties() != null && 
inLongTopic.getProperties().containsKey(
+                        SysConstants.TUBE_TOPIC_FILTER_KEY)) {
+                    filters = (TreeSet<String>) 
inLongTopic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
+                }
+                messageConsumer.subscribe(inLongTopic.getTopic(), filters);
+                messageConsumer.completeSubscribe();
+
+                String threadName = "sort_sdk_fetch_thread_" + StringUtil
+                        .formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+                this.fetchThread = new Thread(new Fetcher(), threadName);
+                this.fetchThread.start();
+            } else {
+                return false;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (messageConsumer == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1L);
+                    LOG.error("consumer == null");
+                    return;
+                }
+
+                ConsumerResult consumerResult = 
messageConsumer.confirmConsume(msgOffset, true);
+                int errCode = consumerResult.getErrCode();
+                if (TErrCodeConstants.SUCCESS != errCode) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1L);
+                } else {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1L);
+                }
+            } catch (Exception e) {
+                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1L);
+                LOG.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void pause() {
+        this.closed = true;
+    }
+
+    @Override
+    public void resume() {
+        this.closed = false;
+    }
+
+    @Override
+    public boolean close() {
+        this.closed = true;
+        try {
+            if (fetchThread != null) {
+                fetchThread.interrupt();
+            }
+            if (messageConsumer != null) {
+                messageConsumer.shutdown();
+            }
+        } catch (Throwable throwable) {
+            throwable.printStackTrace();
+        }
+        LOG.info("closed {}", inLongTopic);
+        return true;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.closed;
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.isStopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return this.isStopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0L;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0L;
+    }
+
+    public class Fetcher implements Runnable {
+
+        /**
+         * put the received msg to onFinished method
+         *
+         * @param messageRecord {@link MessageRecord}
+         */
+        private void handleAndCallbackMsg(MessageRecord messageRecord) {
+            long start = System.currentTimeMillis();
+            try {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackTimes(1L);
+                
context.getConfig().getCallback().onFinishedBatch(Collections.singletonList(messageRecord));
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1L);
+            } catch (Exception e) {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackErrorTimes(1L);
+                e.printStackTrace();
+            }
+        }
+
+        /**
+         * parseAttr from k1=v1&k2=v2 to kv map
+         *
+         * @param splitter {@link Splitter}
+         * @param attr String
+         * @param entrySplitterStr String
+         * @return {@link Map<String,String>}
+         */
+        private Map<String, String> parseAttr(Splitter splitter, String attr, 
String entrySplitterStr) {
+            Map<String, String> map = new HashMap<>();
+            for (String s : splitter.split(attr)) {
+                int idx = s.indexOf(entrySplitterStr);
+                String k = s;
+                String v = null;
+                if (idx > 0) {
+                    k = s.substring(0, idx);
+                    v = s.substring(idx + 1);
+                }
+                map.put(k, v);
+            }
+            return map;
+        }
+
+        private Map<String, String> getAttributeMap(String attribute) {
+            final Splitter splitter = Splitter.on("&");
+            return parseAttr(splitter, attribute, "=");
+        }
+
+        @Override
+        public void run() {
+            boolean hasPermit;
+            while (true) {
+                hasPermit = false;
+                try {
+                    if (context.getConfig().isStopConsume() || isStopConsume) {
+                        TimeUnit.MILLISECONDS.sleep(50L);
+                        continue;
+                    }
+
+                    if (sleepTime > 0) {
+                        TimeUnit.MILLISECONDS.sleep(sleepTime);
+                    }
+
+                    context.acquireRequestPermit();
+                    hasPermit = true;
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addMsgCount(1L).addFetchTimes(1L);
+
+                    long startFetchTime = System.currentTimeMillis();
+                    ConsumerResult message = messageConsumer.getMessage();
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addFetchTimeCost(System.currentTimeMillis() - 
startFetchTime);
+                    if (null != message && TErrCodeConstants.SUCCESS == 
message.getErrCode()) {
+                        List<InLongMessage> msgs = new ArrayList<>();
+                        for (Message msg : message.getMessageList()) {
+                            msgs.add(new InLongMessage(msg.getData(), 
getAttributeMap(msg.getAttribute())));
+                            context.getStatManager()
+                                    
.getStatistics(context.getConfig().getSortTaskId(),
+                                            
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    
.addMsgCount(1L).addConsumeSize(msg.getData().length);
+                        }
+
+                        handleAndCallbackMsg(new 
MessageRecord(inLongTopic.getTopicKey(), msgs,
+                                message.getConfirmContext(), 
System.currentTimeMillis()));
+                        sleepTime = 0L;
+                    } else {
+                        context.getStatManager()
+                                
.getStatistics(context.getConfig().getSortTaskId(),
+                                        
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                .addEmptyFetchTimes(1L);
+                        emptyFetchTimes++;
+                        if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
+                            sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
+                                    
context.getConfig().getMaxEmptyPollSleepMs());
+                            emptyFetchTimes = 0;
+                        }
+                    }
+                } catch (Exception e) {
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addFetchErrorTimes(1L);
+                    LOG.error(e.getMessage(), e);
+                } finally {
+                    if (hasPermit) {
+                        context.releaseRequestPermit();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
new file mode 100644
index 0000000..9160764
--- /dev/null
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+
+public class TubeConsumerCreater {
+
+    private final MessageSessionFactory messageSessionFactory;
+    private final TubeClientConfig consumerConfig;
+
+    /**
+     * TubeConsumerCreater constructor
+     *
+     * @param messageSessionFactory {@link MessageSessionFactory}
+     * @param consumerConfig {@link TubeClientConfig}
+     */
+    public TubeConsumerCreater(MessageSessionFactory messageSessionFactory,
+            TubeClientConfig consumerConfig) {
+        this.messageSessionFactory = messageSessionFactory;
+        this.consumerConfig = consumerConfig;
+    }
+
+    /**
+     * get MessageSessionFactory
+     *
+     * @return {@link MessageSessionFactory}
+     */
+    public MessageSessionFactory getMessageSessionFactory() {
+        return messageSessionFactory;
+    }
+
+    /**
+     * get TubeClientConfig
+     *
+     * @return {@link TubeClientConfig}
+     */
+    public TubeClientConfig getTubeClientConfig() {
+        return consumerConfig;
+    }
+}

Reply via email to