This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new f243ff46 fix(window) remove isSplitsReceiver
     new a4ff9b1f Merge pull request #223 from ni-ze/supportRsqldb
f243ff46 is described below

commit f243ff4674dfee2b6d9e983891739ebd3268e110
Author: 维章 <[email protected]>
AuthorDate: Tue Sep 27 16:37:01 2022 +0800

    fix(window) remove isSplitsReceiver
---
 .../rocketmq/streams/source/RocketMQSource.java    | 55 -----------------
 .../common/channel/source/AbstractSource.java      |  3 -
 .../streams/window/fire/SplitEventTimeManager.java | 72 ----------------------
 3 files changed, 130 deletions(-)

diff --git 
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
 
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 2d199b54..4134c191 100644
--- 
a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ 
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -159,61 +159,6 @@ public class RocketMQSource extends 
AbstractSupportShuffleSource {
         }
     }
 
-    //todo 计算正在工作的分片?
-    @Override
-    public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
-        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
-        defaultMQAdminExt.setVipChannelEnabled(false);
-        defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
-        defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName());
-        try {
-            defaultMQAdminExt.start();
-            Map<MessageQueue, String> queue2Instances = 
getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
-            Map<String, List<ISplit>> instanceOwnerQueues = new HashMap<>();
-            for (MessageQueue messageQueue : queue2Instances.keySet()) {
-                RocketMQMessageQueue metaqMessageQueue = new 
RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(), 
messageQueue.getBrokerName(), messageQueue.getQueueId()));
-                if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
-                    continue;
-                }
-                String instanceName = queue2Instances.get(messageQueue);
-                List<ISplit> splits = 
instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>());
-                splits.add(metaqMessageQueue);
-            }
-            return instanceOwnerQueues;
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            defaultMQAdminExt.shutdown();
-        }
-    }
-
-    private Map<MessageQueue, String> 
getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
-                                                                      String 
groupName) {
-        HashMap<MessageQueue, String> results = new HashMap<>();
-
-        try {
-            ConsumerConnection consumerConnection = 
defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
-            Iterator iterator = 
consumerConnection.getConnectionSet().iterator();
-
-            while (iterator.hasNext()) {
-                Connection connection = (Connection) iterator.next();
-                String clientId = connection.getClientId();
-                ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
-                Iterator iterator1 = 
consumerRunningInfo.getMqTable().keySet().iterator();
-
-                while (iterator1.hasNext()) {
-                    MessageQueue messageQueue = (MessageQueue) 
iterator1.next();
-                    results.put(messageQueue, clientId.split("@")[1]);
-                }
-            }
-        } catch (Exception ex) {
-            ;
-        }
-
-        return results;
-    }
-
     @Override
     protected boolean isNotDataSplit(String queueId) {
         return false;
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index a0e35353..a333923e 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -471,9 +471,6 @@ public abstract class AbstractSource extends 
BasedConfigurable implements ISourc
         return null;
     }
 
-    public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
-        return new HashMap<>();
-    }
 
     /**
      * 当新增分片时,需要做的回调
diff --git 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 797c1c4c..6148ae0f 100644
--- 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -37,18 +37,11 @@ public class SplitEventTimeManager {
     protected static final Log LOG = 
LogFactory.getLog(SplitEventTimeManager.class);
     protected static final Map<String, Long> messageSplitId2MaxTime = new 
HashMap<>();
     private AtomicInteger queueIdCount = new AtomicInteger(0);
-    protected Long lastUpdateTime;
 
     protected volatile Integer allSplitSize;
-    protected volatile Integer workingSplitSize = 0;
-    protected Map<String, List<ISplit>> splitsGroupByInstance;
     protected ISource source;
-
-    protected volatile boolean isAllSplitReceived = false;
     protected transient String queueId;
 
-    private static Long splitReadyTime;
-
     public SplitEventTimeManager(ISource source, String queueId) {
         this.source = source;
         this.queueId = queueId;
@@ -83,10 +76,6 @@ public class SplitEventTimeManager {
     }
 
     public Long getMaxEventTime() {
-
-        if (!isSplitsReceiver()) {
-            return null;
-        }
         Long min = null;
 
         synchronized (messageSplitId2MaxTime) {
@@ -109,68 +98,7 @@ public class SplitEventTimeManager {
 
     }
 
-    protected boolean isSplitsReceiver() {
-        if (isAllSplitReceived) {
-            return true;
-        }
-        if (lastUpdateTime == null) {
-            lastUpdateTime = System.currentTimeMillis();
-        }
-
-        if (allSplitSize == -1) {
-            return true;
-        }
 
-        if (allSplitSize != -1 && allSplitSize > workingSplitSize) {
-            if (System.currentTimeMillis() - lastUpdateTime > 1000) {
-                workingSplitSize = calcuteWorkingSplitSize();
-                lastUpdateTime = System.currentTimeMillis();
-                if (allSplitSize > workingSplitSize) {
-                    return false;
-                }
-            }
-
-            if (this.splitsGroupByInstance == null) {
-                return false;
-            }
-            //add time out policy: no necessary waiting for other split
-            if (splitReadyTime == null) {
-                synchronized (this) {
-                    if (splitReadyTime == null) {
-                        splitReadyTime = System.currentTimeMillis();
-                    }
-                }
-            }
-            if (System.currentTimeMillis() - splitReadyTime >= 1000 * 60) {
-                this.isAllSplitReceived = true;
-                return true;
-            }
-        }
-
-        if (workingSplitSize == messageSplitId2MaxTime.size()) {
-            this.isAllSplitReceived = true;
-            return true;
-        }
-        return false;
-    }
-
-    private Integer calcuteWorkingSplitSize() {
-        if (source instanceof AbstractSource) {
-            AbstractSource abstractSource = (AbstractSource) source;
-            Map<String, List<ISplit>> splits = 
abstractSource.getWorkingSplitsGroupByInstances();
-            if (splits == null) {
-                return 0;
-            }
-            this.splitsGroupByInstance = splits;
-            int count = 0;
-            for (List<ISplit> splitList : splits.values()) {
-                count += splitList.size();
-            }
-            return count;
-        }
-        return 0;
-
-    }
 
     public void setSource(ISource source) {
         this.source = source;

Reply via email to