This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new f243ff46 fix(window) remove isSplitsReceiver
new a4ff9b1f Merge pull request #223 from ni-ze/supportRsqldb
f243ff46 is described below
commit f243ff4674dfee2b6d9e983891739ebd3268e110
Author: 维章 <[email protected]>
AuthorDate: Tue Sep 27 16:37:01 2022 +0800
fix(window) remove isSplitsReceiver
---
.../rocketmq/streams/source/RocketMQSource.java | 55 -----------------
.../common/channel/source/AbstractSource.java | 3 -
.../streams/window/fire/SplitEventTimeManager.java | 72 ----------------------
3 files changed, 130 deletions(-)
diff --git
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 2d199b54..4134c191 100644
---
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -159,61 +159,6 @@ public class RocketMQSource extends
AbstractSupportShuffleSource {
}
}
- //todo 计算正在工作的分片?
- @Override
- public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExt.setVipChannelEnabled(false);
- defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
- defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName());
- try {
- defaultMQAdminExt.start();
- Map<MessageQueue, String> queue2Instances =
getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
- Map<String, List<ISplit>> instanceOwnerQueues = new HashMap<>();
- for (MessageQueue messageQueue : queue2Instances.keySet()) {
- RocketMQMessageQueue metaqMessageQueue = new
RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),
messageQueue.getBrokerName(), messageQueue.getQueueId()));
- if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
- continue;
- }
- String instanceName = queue2Instances.get(messageQueue);
- List<ISplit> splits =
instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>());
- splits.add(metaqMessageQueue);
- }
- return instanceOwnerQueues;
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- defaultMQAdminExt.shutdown();
- }
- }
-
- private Map<MessageQueue, String>
getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
- String
groupName) {
- HashMap<MessageQueue, String> results = new HashMap<>();
-
- try {
- ConsumerConnection consumerConnection =
defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
- Iterator iterator =
consumerConnection.getConnectionSet().iterator();
-
- while (iterator.hasNext()) {
- Connection connection = (Connection) iterator.next();
- String clientId = connection.getClientId();
- ConsumerRunningInfo consumerRunningInfo =
defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
- Iterator iterator1 =
consumerRunningInfo.getMqTable().keySet().iterator();
-
- while (iterator1.hasNext()) {
- MessageQueue messageQueue = (MessageQueue)
iterator1.next();
- results.put(messageQueue, clientId.split("@")[1]);
- }
- }
- } catch (Exception ex) {
- ;
- }
-
- return results;
- }
-
@Override
protected boolean isNotDataSplit(String queueId) {
return false;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index a0e35353..a333923e 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -471,9 +471,6 @@ public abstract class AbstractSource extends
BasedConfigurable implements ISourc
return null;
}
- public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
- return new HashMap<>();
- }
/**
* 当新增分片时,需要做的回调
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 797c1c4c..6148ae0f 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -37,18 +37,11 @@ public class SplitEventTimeManager {
protected static final Log LOG =
LogFactory.getLog(SplitEventTimeManager.class);
protected static final Map<String, Long> messageSplitId2MaxTime = new
HashMap<>();
private AtomicInteger queueIdCount = new AtomicInteger(0);
- protected Long lastUpdateTime;
protected volatile Integer allSplitSize;
- protected volatile Integer workingSplitSize = 0;
- protected Map<String, List<ISplit>> splitsGroupByInstance;
protected ISource source;
-
- protected volatile boolean isAllSplitReceived = false;
protected transient String queueId;
- private static Long splitReadyTime;
-
public SplitEventTimeManager(ISource source, String queueId) {
this.source = source;
this.queueId = queueId;
@@ -83,10 +76,6 @@ public class SplitEventTimeManager {
}
public Long getMaxEventTime() {
-
- if (!isSplitsReceiver()) {
- return null;
- }
Long min = null;
synchronized (messageSplitId2MaxTime) {
@@ -109,68 +98,7 @@ public class SplitEventTimeManager {
}
- protected boolean isSplitsReceiver() {
- if (isAllSplitReceived) {
- return true;
- }
- if (lastUpdateTime == null) {
- lastUpdateTime = System.currentTimeMillis();
- }
-
- if (allSplitSize == -1) {
- return true;
- }
- if (allSplitSize != -1 && allSplitSize > workingSplitSize) {
- if (System.currentTimeMillis() - lastUpdateTime > 1000) {
- workingSplitSize = calcuteWorkingSplitSize();
- lastUpdateTime = System.currentTimeMillis();
- if (allSplitSize > workingSplitSize) {
- return false;
- }
- }
-
- if (this.splitsGroupByInstance == null) {
- return false;
- }
- //add time out policy: no necessary waiting for other split
- if (splitReadyTime == null) {
- synchronized (this) {
- if (splitReadyTime == null) {
- splitReadyTime = System.currentTimeMillis();
- }
- }
- }
- if (System.currentTimeMillis() - splitReadyTime >= 1000 * 60) {
- this.isAllSplitReceived = true;
- return true;
- }
- }
-
- if (workingSplitSize == messageSplitId2MaxTime.size()) {
- this.isAllSplitReceived = true;
- return true;
- }
- return false;
- }
-
- private Integer calcuteWorkingSplitSize() {
- if (source instanceof AbstractSource) {
- AbstractSource abstractSource = (AbstractSource) source;
- Map<String, List<ISplit>> splits =
abstractSource.getWorkingSplitsGroupByInstances();
- if (splits == null) {
- return 0;
- }
- this.splitsGroupByInstance = splits;
- int count = 0;
- for (List<ISplit> splitList : splits.values()) {
- count += splitList.size();
- }
- return count;
- }
- return 0;
-
- }
public void setSource(ISource source) {
this.source = source;