This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit aadf7eaa042ba3cc6c89433bc74bc99752c1c149 Author: 维章 <[email protected]> AuthorDate: Tue May 31 10:51:56 2022 +0800 make RocketMQWindowExample runnable --- .../apache/rocketmq/streams/common/utils/KryoUtil.java | 4 ---- .../rocketmq/streams/common/utils/SerializeUtil.java | 6 ------ .../streams/window/operator/AbstractShuffleWindow.java | 17 ++--------------- .../rocketmq/streams/window/shuffle/ShuffleChannel.java | 9 +++++++++ 4 files changed, 11 insertions(+), 25 deletions(-) diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java index 538cb027..d6fa1072 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java @@ -50,10 +50,6 @@ public class KryoUtil { //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册) kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置 - //Fix the NPE bug when deserializing Collections. -// ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) -// .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - return kryo; } }; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java index 7def6f7e..60691cc6 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java @@ -33,7 +33,6 @@ import org.apache.rocketmq.streams.common.datatype.ArrayDataType; import org.apache.rocketmq.streams.common.datatype.DataType; import org.apache.rocketmq.streams.common.datatype.StringDataType; import org.apache.rocketmq.streams.common.interfaces.ISerialize; -import org.nustaq.serialization.FSTConfiguration; public class SerializeUtil { private static final Log LOG = LogFactory.getLog(SerializeUtil.class); @@ -44,11 +43,6 @@ public class SerializeUtil { * @return */ public static byte[] serialize(Object object) { - if(ISerialize.class.isInstance(object)){ -// byte[] bytes = conf.asByteArray(object); -// return bytes; - return KryoUtil.writeObjectToByteArray(object); - } DataType dataType = DataTypeUtil.getDataTypeFromClass(object.getClass()); if (ArrayDataType.class.isInstance(dataType)) { int length = Array.getLength(object); diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java index ea3b923a..070a40f0 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java @@ -76,21 +76,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow { @Override public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) { - if (hasCreated.get()==false||this.shuffleChannel==null) { - synchronized (this){ - if(hasCreated.get()==false||this.shuffleChannel==null){ - this.windowFireSource = new WindowTrigger(this); - this.windowFireSource.init(); - this.windowFireSource.start(getFireReceiver()); - this.shuffleChannel = new ShuffleChannel(this); - this.shuffleChannel.init(); - windowCache.setBatchSize(5000); - windowCache.setShuffleChannel(shuffleChannel); - shuffleChannel.startChannel(); - hasCreated.set(true); - } - } - } + shuffleChannel.startChannel(); return super.doMessage(message, context); } @@ -99,6 +85,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow { Set<String> splitIds = new HashSet<>(); splitIds.add(windowInstance.getSplitId()); shuffleChannel.flush(splitIds); + return doFireWindowInstance(windowInstance); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index 107f1ce5..a5c150a1 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@ -60,6 +60,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID; @@ -110,6 +111,14 @@ public class ShuffleChannel extends AbstractSystemChannel { } + protected transient AtomicBoolean hasStart = new AtomicBoolean(false); + + @Override + public void startChannel() { + if (hasStart.compareAndSet(false, true)) { + super.startChannel(); + } + } /** * init shuffle channel
