This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f340ce5 [INLONG-1931][Feature][sort-sdk] sort-sdk support consume
tube events from cachezone (#2186)
f340ce5 is described below
commit f340ce59ff10bf05df47a75be7e4855ea88cac1b
Author: wardli <[email protected]>
AuthorDate: Thu Jan 20 20:19:14 2022 +0800
[INLONG-1931][Feature][sort-sdk] sort-sdk support consume tube events from
cachezone (#2186)
---
inlong-sdk/sort-sdk/pom.xml | 6 +
.../inlong/sdk/sort/api/InLongTopicFetcher.java | 5 +
.../apache/inlong/sdk/sort/api/SysConstants.java | 26 ++
.../inlong/sdk/sort/entity/CacheZoneCluster.java | 5 +
.../apache/inlong/sdk/sort/entity/InLongTopic.java | 17 +-
.../sdk/sort/impl/InLongTopicManagerImpl.java | 167 +++++++++---
.../sort/impl/pulsar/InLongPulsarFetcherImpl.java | 44 ++-
.../sdk/sort/impl/tube/InLongTubeFetcherImpl.java | 300 +++++++++++++++++++++
.../sdk/sort/impl/tube/TubeConsumerCreater.java | 59 ++++
9 files changed, 566 insertions(+), 63 deletions(-)
diff --git a/inlong-sdk/sort-sdk/pom.xml b/inlong-sdk/sort-sdk/pom.xml
index cb42251..42b0336 100644
--- a/inlong-sdk/sort-sdk/pom.xml
+++ b/inlong-sdk/sort-sdk/pom.xml
@@ -118,6 +118,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
index e2ef03e..b113a82 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
@@ -23,6 +23,11 @@ public abstract class InLongTopicFetcher {
protected InLongTopic inLongTopic;
protected ClientContext context;
+ protected volatile boolean closed = false;
+ protected volatile boolean isStopConsume = false;
+ // use for empty topic to sleep
+ protected long sleepTime = 0L;
+ protected int emptyFetchTimes = 0;
public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
this.inLongTopic = inLongTopic;
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
new file mode 100644
index 0000000..79635bc
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SysConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.api;
+
+public class SysConstants {
+
+ public static final String TUBE_TOPIC_FILTER_KEY = "tube_topic_filter_key";
+
+}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
index 9616cf4..4bac733 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneCluster.java
@@ -71,4 +71,9 @@ public class CacheZoneCluster {
public int hashCode() {
return Objects.hash(clusterId);
}
+
+ @Override
+ public String toString() {
+ return "CacheZoneCluster>>>" + clusterId + "|" + bootstraps + "|" +
token;
+ }
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
index 1ae5867..384f952 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/InLongTopic.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sdk.sort.entity;
+import java.util.Map;
import java.util.Objects;
public class InLongTopic {
@@ -26,6 +27,7 @@ public class InLongTopic {
private int partitionId;
//pulsar,kafka,tube
private String topicType;
+ private Map<String, Object> properties;
public String getTopic() {
return topic;
@@ -59,6 +61,14 @@ public class InLongTopic {
this.topicType = topicType;
}
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, Object> properties) {
+ this.properties = properties;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -84,11 +94,6 @@ public class InLongTopic {
@Override
public String toString() {
- return "InLongTopic{"
- + "topic='" + topic
- + ", inLongCluster=" + cacheZoneCluster
- + ", partitionId=" + partitionId
- + ", topicType='" + topicType
- + '}';
+ return "InLongTopic>>>" + topic + "|" + "|" + partitionId + "|" +
topicType + "|" + cacheZoneCluster;
}
}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index cfbd56c..2f310e5 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,8 +36,13 @@ import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl;
+import org.apache.inlong.sdk.sort.impl.tube.TubeConsumerCreater;
import org.apache.inlong.sdk.sort.util.PeriodicTask;
import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
@@ -49,6 +55,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, TubeConsumerCreater> tubeFactories
= new ConcurrentHashMap<>();
private final PeriodicTask updateMetaDataWorker;
private volatile List<String> toBeSelectFetchers = new ArrayList<>();
@@ -69,26 +76,25 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
@Override
public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) {
+
try {
InLongTopicFetcher result =
fetchers.get(inLongTopic.getTopicKey());
if (result == null) {
- InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, context);
+ // create fetcher (pulsar,tube,kafka)
+ InLongTopicFetcher inLongTopicFetcher =
createInLongTopicFetcher(inLongTopic);
InLongTopicFetcher preValue =
fetchers.putIfAbsent(inLongTopic.getTopicKey(), inLongTopicFetcher);
logger.info("addFetcher :{}", inLongTopic.getTopicKey());
if (preValue != null) {
result = preValue;
- inLongTopicFetcher.close();
+ if (inLongTopicFetcher != null) {
+ inLongTopicFetcher.close();
+ }
logger.info("addFetcher create same fetcher {}",
inLongTopic);
} else {
result = inLongTopicFetcher;
- PulsarClient pulsarClient =
pulsarClients.get(inLongTopic.getInLongCluster().getClusterId());
- if (null == pulsarClient) {
- logger.error("pulsar client is null:{}",
inLongTopic.getInLongCluster().getClusterId());
- return null;
- }
-
- if (!result.init(pulsarClient)) {
- logger.info("addFetcher init fail:{}",
inLongTopic.getTopicKey());
+ if (result != null
+ &&
!result.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId())))
{
+ logger.info("addFetcher init fail {}",
inLongTopic.getTopicKey());
result.close();
result = null;
}
@@ -100,6 +106,25 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
}
}
+ /**
+ * create fetcher (pulsar,tube,kafka)
+ *
+ * @param inLongTopic {@link InLongTopic}
+ * @return {@link InLongTopicFetcher}
+ */
+ private InLongTopicFetcher createInLongTopicFetcher(InLongTopic
inLongTopic) {
+ if
(InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+ logger.info("the topic is pulsar {}", inLongTopic);
+ return new InLongPulsarFetcherImpl(inLongTopic, context);
+ } else if
(InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+ logger.info("the topic is tube {}", inLongTopic);
+ return new InLongTubeFetcherImpl(inLongTopic, context);
+ } else {
+ logger.error("topic type not support " +
inLongTopic.getTopicType());
+ return null;
+ }
+ }
+
@Override
public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean
closeFetcher) {
InLongTopicFetcher result = fetchers.remove(inLongTopic.getTopicKey());
@@ -142,7 +167,6 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
@Override
public boolean clean() {
- boolean result = false;
String sortTaskId = context.getConfig().getSortTaskId();
try {
logger.info("start close {}", sortTaskId);
@@ -152,13 +176,15 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
}
closeFetcher();
- result = true;
- logger.info("close finished {} {}", sortTaskId, result);
+ closePulsarClient();
+ closeTubeSessionFactory();
+ logger.info("close finished {}", sortTaskId);
+ return true;
} catch (Throwable th) {
- logger.info("close error {} {}", sortTaskId, th);
+ logger.error("close error " + sortTaskId, th);
}
- return result;
+ return false;
}
private void closeAllFetcher() {
@@ -182,6 +208,36 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
}
}
+ private void closePulsarClient() {
+ for (Map.Entry<String, PulsarClient> entry : pulsarClients.entrySet())
{
+ PulsarClient pulsarClient = entry.getValue();
+ String key = entry.getKey();
+ try {
+ if (pulsarClient != null) {
+ pulsarClient.close();
+ }
+ } catch (Exception e) {
+ logger.error("close PulsarClient" + key + " error.", e);
+ }
+ }
+ pulsarClients.clear();
+ }
+
+ private void closeTubeSessionFactory() {
+ for (Map.Entry<String, TubeConsumerCreater> entry :
tubeFactories.entrySet()) {
+ MessageSessionFactory tubeMessageSessionFactory =
entry.getValue().getMessageSessionFactory();
+ String key = entry.getKey();
+ try {
+ if (tubeMessageSessionFactory != null) {
+ tubeMessageSessionFactory.shutdown();
+ }
+ } catch (Exception e) {
+ logger.error("close MessageSessionFactory" + key + " error.",
e);
+ }
+ }
+ tubeFactories.clear();
+ }
+
private List<String> getNewTopics(List<InLongTopic>
newSubscribedInLongTopics) {
if (newSubscribedInLongTopics != null &&
newSubscribedInLongTopics.size() > 0) {
List<String> newTopics = new ArrayList<>();
@@ -195,6 +251,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private void handleCurrentConsumeConfig(List<InLongTopic>
currentConsumeConfig) {
if (null == currentConsumeConfig) {
+ logger.warn("List<InLongTopic> currentConsumeConfig is null");
return;
}
@@ -221,11 +278,11 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
/**
* offline inlong topic which not belong the sortTaskId
*
- * @param oldInLongTopics List
+ * @param oldInLongTopics {@link List<String>}
*/
private void offlineRmovedTopic(List<String> oldInLongTopics) {
for (String fetchKey : oldInLongTopics) {
- logger.info("offlineRmovedTopic :{}", fetchKey);
+ logger.info("offlineRmovedTopic {}", fetchKey);
InLongTopic inLongTopic = fetchers.get(fetchKey).getInLongTopic();
InLongTopicFetcher inLongTopicFetcher =
fetchers.getOrDefault(fetchKey, null);
if (inLongTopicFetcher != null) {
@@ -254,6 +311,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics,
List<String> reallyNewTopic) {
for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
+
logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
continue;
}
onlineTopic(inLongTopic);
@@ -262,10 +320,13 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private void onlineTopic(InLongTopic inLongTopic) {
if
(InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
+ logger.info("the topic is pulsar:{}", inLongTopic);
onlinePulsarTopic(inLongTopic);
} else if
(InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
+ logger.info("the topic is kafka:{}", inLongTopic);
onlineKafkaTopic(inLongTopic);
} else if
(InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
+ logger.info("the topic is tube:{}", inLongTopic);
onlineTubeTopic(inLongTopic);
} else {
logger.error("topic type:{} not support",
inLongTopic.getTopicType());
@@ -274,25 +335,10 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private void onlinePulsarTopic(InLongTopic inLongTopic) {
if (!checkAndCreateNewPulsarClient(inLongTopic)) {
+ logger.error("checkAndCreateNewPulsarClient error:{}",
inLongTopic);
return;
}
-
- if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
- logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
- if (context != null && context.getStatManager() != null) {
- context.getStatManager()
- .getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addTopicOnlineTimes(1);
- InLongTopicFetcher fetcher = addFetcher(inLongTopic);
- if (fetcher == null) {
- fetchers.remove(inLongTopic.getTopicKey());
- logger.error("add fetcher error:{}",
inLongTopic.getTopicKey());
- }
- } else {
- logger.error("context == null or context.getStatManager() ==
null");
- }
- }
+ createNewFetcher(inLongTopic);
}
private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
@@ -308,6 +354,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
inLongTopic.getInLongCluster().getBootstraps(),
inLongTopic.getInLongCluster().getToken());
} catch (Exception e) {
+ logger.error("create pulsar client error {}", inLongTopic);
logger.error(e.getMessage(), e);
return false;
}
@@ -316,6 +363,34 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
return false;
}
}
+ logger.info("create pulsar client true {}", inLongTopic);
+ return true;
+ }
+
+ private boolean checkAndCreateNewTubeSessionFactory(InLongTopic
inLongTopic) {
+ if
(!tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
+ if (inLongTopic.getInLongCluster().getBootstraps() != null) {
+ try {
+ //create MessageSessionFactory
+ TubeClientConfig tubeConfig = new
TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
+ MessageSessionFactory messageSessionFactory = new
TubeSingleSessionFactory(tubeConfig);
+ TubeConsumerCreater tubeConsumerCreater = new
TubeConsumerCreater(messageSessionFactory,
+ tubeConfig);
+
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(),
tubeConsumerCreater);
+ logger.info("create tube client succ {} {} {}",
inLongTopic.getInLongCluster().getClusterId(),
+ inLongTopic.getInLongCluster().getBootstraps(),
+ inLongTopic.getInLongCluster().getToken());
+ } catch (Exception e) {
+ logger.error("create tube client error {}", inLongTopic);
+ logger.error(e.getMessage(), e);
+ return false;
+ }
+ } else {
+ logger.info("bootstrap is null {}",
inLongTopic.getInLongCluster());
+ return false;
+ }
+ }
+ logger.info("create pulsar client true {}", inLongTopic);
return true;
}
@@ -323,6 +398,30 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
}
private void onlineTubeTopic(InLongTopic inLongTopic) {
+ if (!checkAndCreateNewTubeSessionFactory(inLongTopic)) {
+ logger.error("checkAndCreateNewPulsarClient error:{}",
inLongTopic);
+ return;
+ }
+ createNewFetcher(inLongTopic);
+ }
+
+ private void createNewFetcher(InLongTopic inLongTopic) {
+ if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
+ logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
+ if (context != null && context.getStatManager() != null) {
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
+ .addTopicOnlineTimes(1);
+ InLongTopicFetcher fetcher = addFetcher(inLongTopic);
+ if (fetcher == null) {
+ fetchers.remove(inLongTopic.getTopicKey());
+ logger.error("add fetcher error:{}",
inLongTopic.getTopicKey());
+ }
+ } else {
+ logger.error("context == null or context.getStatManager() ==
null");
+ }
+ }
}
private class UpdateMetaDataThread extends PeriodicTask {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index ad463e5..9803936 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -48,12 +48,10 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
private final Logger logger =
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
private final ReentrantReadWriteLock mainLock = new
ReentrantReadWriteLock(true);
private final ConcurrentHashMap<String, MessageId> offsetCache = new
ConcurrentHashMap<>();
- private volatile boolean closed = false;
+
private Consumer<byte[]> consumer;
- private volatile boolean stopConsume = false;
+
private volatile Thread fetchThread;
- private long sleepTime = 0L;
- private int emptyPollTimes = 0;
public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
ClientContext context) {
@@ -62,12 +60,12 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
@Override
public void stopConsume(boolean stopConsume) {
- this.stopConsume = stopConsume;
+ this.isStopConsume = stopConsume;
}
@Override
public boolean isConsumeStop() {
- return stopConsume;
+ return isStopConsume;
}
@Override
@@ -77,18 +75,18 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
@Override
public long getConsumedDataSize() {
- return 0;
+ return 0L;
}
@Override
public long getAckedOffset() {
- return 0;
+ return 0L;
}
private void ackSucc(String offset) {
offsetCache.remove(offset);
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckSuccTimes(1);
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckSuccTimes(1L);
}
/**
@@ -103,7 +101,7 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
if (consumer == null) {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addAckFailTimes(1);
+ .addAckFailTimes(1L);
logger.error("consumer == null");
return;
}
@@ -111,7 +109,7 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
if (messageId == null) {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addAckFailTimes(1);
+ .addAckFailTimes(1L);
logger.error("messageId == null");
return;
}
@@ -121,12 +119,12 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
logger.error("ack fail:{}", msgOffset);
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addAckFailTimes(1);
+ .addAckFailTimes(1L);
return null;
});
} catch (Exception e) {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
- inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1);
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1L);
logger.error(e.getMessage(), e);
throw e;
}
@@ -240,17 +238,17 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimes(1);
+ .addCallbackTimes(1L);
context.getConfig().getCallback().onFinishedBatch(messageRecords);
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1);
+ .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1L);
} catch (Exception e) {
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
- .addCallbackErrorTimes(1);
+ .addCallbackErrorTimes(1L);
e.printStackTrace();
}
}
@@ -265,7 +263,7 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
while (true) {
hasPermit = false;
try {
- if (context.getConfig().isStopConsume() || stopConsume) {
+ if (context.getConfig().isStopConsume() || isStopConsume) {
TimeUnit.MILLISECONDS.sleep(50);
continue;
}
@@ -279,7 +277,7 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addMsgCount(1).addFetchTimes(1);
+ .addMsgCount(1L).addFetchTimes(1L);
long startFetchTime = System.currentTimeMillis();
Messages<byte[]> messages = consumer.batchReceive();
@@ -311,19 +309,19 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addEmptyFetchTimes(1);
- emptyPollTimes++;
- if (emptyPollTimes >=
context.getConfig().getEmptyPollTimes()) {
+ .addEmptyFetchTimes(1L);
+ emptyFetchTimes++;
+ if (emptyFetchTimes >=
context.getConfig().getEmptyPollTimes()) {
sleepTime = Math.min((sleepTime +=
context.getConfig().getEmptyPollSleepStepMs()),
context.getConfig().getMaxEmptyPollSleepMs());
- emptyPollTimes = 0;
+ emptyFetchTimes = 0;
}
}
} catch (Exception e) {
context.getStatManager()
.getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
- .addFetchErrorTimes(1);
+ .addFetchErrorTimes(1L);
logger.error(e.getMessage(), e);
} finally {
if (hasPermit) {
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
new file mode 100644
index 0000000..63abb3f
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import com.google.common.base.Splitter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.api.SysConstants;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongTubeFetcherImpl extends InLongTopicFetcher {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InLongTubeFetcherImpl.class);
+ private PullMessageConsumer messageConsumer;
+ private volatile Thread fetchThread;
+
+ public InLongTubeFetcherImpl(InLongTopic inLongTopic, ClientContext
context) {
+ super(inLongTopic, context);
+ }
+
+ @Override
+ public boolean init(Object object) {
+ TubeConsumerCreater tubeConsumerCreater = (TubeConsumerCreater) object;
+ TubeClientConfig tubeClientConfig =
tubeConsumerCreater.getTubeClientConfig();
+ try {
+ ConsumerConfig consumerConfig = new
ConsumerConfig(tubeClientConfig.getMasterInfo(),
+ context.getConfig().getSortTaskId());
+
+ messageConsumer =
tubeConsumerCreater.getMessageSessionFactory().createPullConsumer(consumerConfig);
+ if (messageConsumer != null) {
+ TreeSet<String> filters = null;
+ if (inLongTopic.getProperties() != null &&
inLongTopic.getProperties().containsKey(
+ SysConstants.TUBE_TOPIC_FILTER_KEY)) {
+ filters = (TreeSet<String>)
inLongTopic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
+ }
+ messageConsumer.subscribe(inLongTopic.getTopic(), filters);
+ messageConsumer.completeSubscribe();
+
+ String threadName = "sort_sdk_fetch_thread_" + StringUtil
+ .formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+ this.fetchThread = new Thread(new Fetcher(), threadName);
+ this.fetchThread.start();
+ } else {
+ return false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void ack(String msgOffset) throws Exception {
+ if (!StringUtils.isEmpty(msgOffset)) {
+ try {
+ if (messageConsumer == null) {
+
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
+ .addAckFailTimes(1L);
+ LOG.error("consumer == null");
+ return;
+ }
+
+ ConsumerResult consumerResult =
messageConsumer.confirmConsume(msgOffset, true);
+ int errCode = consumerResult.getErrCode();
+ if (TErrCodeConstants.SUCCESS != errCode) {
+
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1L);
+ } else {
+
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckSuccTimes(1L);
+ }
+ } catch (Exception e) {
+
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic()).addAckFailTimes(1L);
+ LOG.error(e.getMessage(), e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void pause() {
+ this.closed = true;
+ }
+
+ @Override
+ public void resume() {
+ this.closed = false;
+ }
+
+ @Override
+ public boolean close() {
+ this.closed = true;
+ try {
+ if (fetchThread != null) {
+ fetchThread.interrupt();
+ }
+ if (messageConsumer != null) {
+ messageConsumer.shutdown();
+ }
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
+ LOG.info("closed {}", inLongTopic);
+ return true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public void stopConsume(boolean stopConsume) {
+ this.isStopConsume = stopConsume;
+ }
+
+ @Override
+ public boolean isConsumeStop() {
+ return this.isStopConsume;
+ }
+
+ @Override
+ public InLongTopic getInLongTopic() {
+ return inLongTopic;
+ }
+
+ @Override
+ public long getConsumedDataSize() {
+ return 0L;
+ }
+
+ @Override
+ public long getAckedOffset() {
+ return 0L;
+ }
+
+ public class Fetcher implements Runnable {
+
+ /**
+ * put the received msg to onFinished method
+ *
+ * @param messageRecord {@link MessageRecord}
+ */
+ private void handleAndCallbackMsg(MessageRecord messageRecord) {
+ long start = System.currentTimeMillis();
+ try {
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
+ .addCallbackTimes(1L);
+
context.getConfig().getCallback().onFinishedBatch(Collections.singletonList(messageRecord));
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
+ .addCallbackTimeCost(System.currentTimeMillis() -
start).addCallbackDoneTimes(1L);
+ } catch (Exception e) {
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+ inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
+ .addCallbackErrorTimes(1L);
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * parseAttr from k1=v1&k2=v2 to kv map
+ *
+ * @param splitter {@link Splitter}
+ * @param attr String
+ * @param entrySplitterStr String
+ * @return {@link Map<String,String>}
+ */
+ private Map<String, String> parseAttr(Splitter splitter, String attr,
String entrySplitterStr) {
+ Map<String, String> map = new HashMap<>();
+ for (String s : splitter.split(attr)) {
+ int idx = s.indexOf(entrySplitterStr);
+ String k = s;
+ String v = null;
+ if (idx > 0) {
+ k = s.substring(0, idx);
+ v = s.substring(idx + 1);
+ }
+ map.put(k, v);
+ }
+ return map;
+ }
+
+ private Map<String, String> getAttributeMap(String attribute) {
+ final Splitter splitter = Splitter.on("&");
+ return parseAttr(splitter, attribute, "=");
+ }
+
+ @Override
+ public void run() {
+ boolean hasPermit;
+ while (true) {
+ hasPermit = false;
+ try {
+ if (context.getConfig().isStopConsume() || isStopConsume) {
+ TimeUnit.MILLISECONDS.sleep(50L);
+ continue;
+ }
+
+ if (sleepTime > 0) {
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
+ }
+
+ context.acquireRequestPermit();
+ hasPermit = true;
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addMsgCount(1L).addFetchTimes(1L);
+
+ long startFetchTime = System.currentTimeMillis();
+ ConsumerResult message = messageConsumer.getMessage();
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addFetchTimeCost(System.currentTimeMillis() -
startFetchTime);
+ if (null != message && TErrCodeConstants.SUCCESS ==
message.getErrCode()) {
+ List<InLongMessage> msgs = new ArrayList<>();
+ for (Message msg : message.getMessageList()) {
+ msgs.add(new InLongMessage(msg.getData(),
getAttributeMap(msg.getAttribute())));
+ context.getStatManager()
+
.getStatistics(context.getConfig().getSortTaskId(),
+
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+
.addMsgCount(1L).addConsumeSize(msg.getData().length);
+ }
+
+ handleAndCallbackMsg(new
MessageRecord(inLongTopic.getTopicKey(), msgs,
+ message.getConfirmContext(),
System.currentTimeMillis()));
+ sleepTime = 0L;
+ } else {
+ context.getStatManager()
+
.getStatistics(context.getConfig().getSortTaskId(),
+
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addEmptyFetchTimes(1L);
+ emptyFetchTimes++;
+ if (emptyFetchTimes >=
context.getConfig().getEmptyPollTimes()) {
+ sleepTime = Math.min((sleepTime +=
context.getConfig().getEmptyPollSleepStepMs()),
+
context.getConfig().getMaxEmptyPollSleepMs());
+ emptyFetchTimes = 0;
+ }
+ }
+ } catch (Exception e) {
+ context.getStatManager()
+ .getStatistics(context.getConfig().getSortTaskId(),
+
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+ .addFetchErrorTimes(1L);
+ LOG.error(e.getMessage(), e);
+ } finally {
+ if (hasPermit) {
+ context.releaseRequestPermit();
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
new file mode 100644
index 0000000..9160764
--- /dev/null
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/TubeConsumerCreater.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.tube;
+
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+
+public class TubeConsumerCreater {
+
+ private final MessageSessionFactory messageSessionFactory;
+ private final TubeClientConfig consumerConfig;
+
+ /**
+ * TubeConsumerCreater constructor
+ *
+ * @param messageSessionFactory {@link MessageSessionFactory}
+ * @param consumerConfig {@link TubeClientConfig}
+ */
+ public TubeConsumerCreater(MessageSessionFactory messageSessionFactory,
+ TubeClientConfig consumerConfig) {
+ this.messageSessionFactory = messageSessionFactory;
+ this.consumerConfig = consumerConfig;
+ }
+
+ /**
+ * get MessageSessionFactory
+ *
+ * @return {@link MessageSessionFactory}
+ */
+ public MessageSessionFactory getMessageSessionFactory() {
+ return messageSessionFactory;
+ }
+
+ /**
+ * get TubeClientConfig
+ *
+ * @return {@link TubeClientConfig}
+ */
+ public TubeClientConfig getTubeClientConfig() {
+ return consumerConfig;
+ }
+}