This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 710933f6 add exception
     new 2664360e Merge pull request #242 from ni-ze/newWorld-4
710933f6 is described below

commit 710933f6281488315a12de706708842e5687162f
Author: 维章 <[email protected]>
AuthorDate: Wed Jan 4 10:59:09 2023 +0800

    add exception
---
 .../rocketmq/streams/core/RocketMQStream.java      |   6 +-
 .../rocketmq/streams/core/common/Constant.java     |  14 +--
 .../DataProcessThrowable.java}                     |  22 +++-
 .../DeserializeThrowable.java}                     |  24 +++-
 .../RStreamsException.java}                        |  24 +++-
 .../RecoverStateStoreThrowable.java}               |  22 +++-
 .../function/supplier/AccumulatorSupplier.java     |   3 +-
 .../core/function/supplier/AggregateSupplier.java  |   3 +-
 .../function/supplier/JoinAggregateSupplier.java   |   3 +-
 .../supplier/JoinWindowAggregateSupplier.java      |   3 +-
 .../core/function/supplier/SinkSupplier.java       |   6 +-
 .../core/function/supplier/SourceSupplier.java     |  13 ++-
 .../supplier/WindowAccumulatorSupplier.java        |   5 +-
 .../function/supplier/WindowAggregateSupplier.java |   5 +-
 .../streams/core/running/AbstractProcessor.java    |  11 +-
 .../core/running/AbstractWindowProcessor.java      |   2 -
 .../rocketmq/streams/core/running/Processor.java   |   6 +-
 .../streams/core/running/StreamContextImpl.java    |   1 +
 .../streams/core/running/WorkerThread.java         | 125 ++++++++++++---------
 .../serialization/deImpl/KVJsonDeserializer.java   |   1 -
 .../rocketmq/streams/core/state/RocketMQStore.java |  11 +-
 .../rocketmq/streams/core/state/RocksDBStore.java  |   4 +-
 .../rocketmq/streams/core/state/StateStore.java    |   3 +-
 23 files changed, 196 insertions(+), 121 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
index 34642000..e616964c 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
@@ -16,6 +16,7 @@ package org.apache.rocketmq.streams.core;
  * limitations under the License.
  */
 
+import org.apache.rocketmq.streams.core.common.Constant;
 import org.apache.rocketmq.streams.core.metadata.StreamConfig;
 import org.apache.rocketmq.streams.core.running.WorkerThread;
 import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
@@ -48,7 +49,10 @@ public class RocketMQStream {
         try {
             int threadNum = StreamConfig.STREAMS_PARALLEL_THREAD_NUM;
             for (int i = 0; i < threadNum; i++) {
-                WorkerThread thread = new WorkerThread(topologyBuilder, 
this.properties);
+                String threadName = String.join("_", 
Constant.WORKER_THREAD_NAME, String.valueOf(i));
+
+                WorkerThread thread = new WorkerThread(threadName, 
topologyBuilder, this.properties);
+
                 thread.start();
                 workerThreads.add(thread);
             }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
index c3839331..f90d3dc5 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
@@ -28,9 +28,10 @@ public class Constant {
 
     public static final String SHUFFLE_TOPIC_SUFFIX = "-shuffleTopic";
 
-
     public static final String TIME_TYPE = "timeType";
 
+    public static final String SKIP_DATA_ERROR = "skip_data_error";
+
     public static final String ALLOW_LATENESS_MILLISECOND = 
"allowLatenessMillisecond";
 
     public static final String SPLIT = "@";
@@ -43,17 +44,10 @@ public class Constant {
 
     public static final String STREAM_TAG = "stream_tag";
 
-    public static final String WINDOW_FOR_JOIN = "window_for_join";
-
-    public static final String WINDOW_JOIN_TYPE = "window_join_type";
-
-    public static final String JOIN_STREAM_SIDE = "join_stream_side";
-    public static final String JOIN_COMMON_SHUFFLE_TOPIC = 
"join_common_shuffle_topic";
-
-    public static final String COMMON_NAME_MAKER = "common_name_maker";
-
     public static final String WINDOW_START_TIME = "window_start_time";
 
     public static final String WINDOW_END_TIME = "window_end_time";
 
+    public static final String WORKER_THREAD_NAME = "worker_thread";
+
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
similarity index 56%
copy from 
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to 
core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
index 8b675f61..db96cfec 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,12 +14,25 @@ package org.apache.rocketmq.streams.core.running;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.streams.core.exception;
 
-public interface Processor<T> extends AutoCloseable {
-    void addChild(Processor<T> processor);
+public class DataProcessThrowable extends Throwable {
+    public DataProcessThrowable() {
+    }
 
+    public DataProcessThrowable(String message) {
+        super(message);
+    }
 
-    void preProcess(StreamContext<T> context) throws Throwable;
+    public DataProcessThrowable(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-    void process(T data) throws Throwable;
+    public DataProcessThrowable(Throwable cause) {
+        super(cause);
+    }
+
+    public DataProcessThrowable(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
similarity index 54%
copy from 
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to 
core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
index 8b675f61..4a14f506 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,12 +14,27 @@ package org.apache.rocketmq.streams.core.running;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.streams.core.exception;
 
-public interface Processor<T> extends AutoCloseable {
-    void addChild(Processor<T> processor);
+public class DeserializeThrowable extends Throwable {
+    private static final long serialVersionUID = 2154421351264920776L;
 
+    public DeserializeThrowable() {
+    }
 
-    void preProcess(StreamContext<T> context) throws Throwable;
+    public DeserializeThrowable(String message) {
+        super(message);
+    }
 
-    void process(T data) throws Throwable;
+    public DeserializeThrowable(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DeserializeThrowable(Throwable cause) {
+        super(cause);
+    }
+
+    public DeserializeThrowable(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
similarity index 54%
copy from 
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to 
core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
index 8b675f61..dc361c22 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,12 +14,27 @@ package org.apache.rocketmq.streams.core.running;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.streams.core.exception;
 
-public interface Processor<T> extends AutoCloseable {
-    void addChild(Processor<T> processor);
+public class RStreamsException extends RuntimeException {
+    private static final long serialVersionUID = 6729806497659471678L;
 
+    public RStreamsException() {
+    }
 
-    void preProcess(StreamContext<T> context) throws Throwable;
+    public RStreamsException(String message) {
+        super(message);
+    }
 
-    void process(T data) throws Throwable;
+    public RStreamsException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RStreamsException(Throwable cause) {
+        super(cause);
+    }
+
+    public RStreamsException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
similarity index 55%
copy from 
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to 
core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
index 8b675f61..27478555 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,12 +14,25 @@ package org.apache.rocketmq.streams.core.running;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.streams.core.exception;
 
-public interface Processor<T> extends AutoCloseable {
-    void addChild(Processor<T> processor);
+public class RecoverStateStoreThrowable extends Throwable {
+    public RecoverStateStoreThrowable() {
+    }
 
+    public RecoverStateStoreThrowable(String message) {
+        super(message);
+    }
 
-    void preProcess(StreamContext<T> context) throws Throwable;
+    public RecoverStateStoreThrowable(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-    void process(T data) throws Throwable;
+    public RecoverStateStoreThrowable(Throwable cause) {
+        super(cause);
+    }
+
+    public RecoverStateStoreThrowable(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
index 5a6f3df6..a91c754d 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
 import org.apache.rocketmq.streams.core.function.SelectAction;
 import org.apache.rocketmq.streams.core.metadata.Data;
@@ -62,7 +63,7 @@ public class AccumulatorSupplier<K, V, R, OV> implements 
Supplier<Processor<V>>
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             this.stateStore = super.waitStateReplay();
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
index 99c9e5ac..0bb30e5a 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.AggregateAction;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.running.AbstractProcessor;
@@ -63,7 +64,7 @@ public class AggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>> {
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             this.stateStore = super.waitStateReplay();
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
index 8958012d..60fe3644 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.ValueJoinAction;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.running.AbstractProcessor;
@@ -66,7 +67,7 @@ public class JoinAggregateSupplier<K, V1, V2, OUT> implements 
Supplier<Processor
         }
 
         @Override
-        public void preProcess(StreamContext<Object> context) throws Throwable 
{
+        public void preProcess(StreamContext<Object> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             this.stateStore = super.waitStateReplay();
             String stateTopicName = getSourceTopic() + 
Constant.STATE_TOPIC_SUFFIX;
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
index 363c8bfd..9c7cb312 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.ValueJoinAction;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
@@ -79,7 +80,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT> 
implements Supplier<Pro
         }
 
         @Override
-        public void preProcess(StreamContext<Object> context) throws Throwable 
{
+        public void preProcess(StreamContext<Object> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             leftWindowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
             rightWindowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index cccfff94..46f2eeea 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -63,15 +63,14 @@ public class SinkSupplier<K, T> implements 
Supplier<Processor<T>> {
         @Override
         public void process(T data) throws Throwable {
             if (data != null) {
-                Message message;
-
-                //todo 异常体系,哪些可以不必中断线程,哪些是需要中断的?
                 byte[] value = this.serializer.serialize(key, data);
                 if (value == null) {
                     //目前RocketMQ不支持发送body为null的消息;
                     return;
                 }
 
+                Message message;
+
                 if (this.key == null) {
                     message = new Message(this.topicName, value);
 
@@ -91,7 +90,6 @@ public class SinkSupplier<K, T> implements 
Supplier<Processor<T>> {
                     message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, 
this.key.getClass().getName());
                     message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, 
data.getClass().getName());
 
-                    //todo 丑陋
                     if 
(this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                         message.putUserProperty(Constant.SOURCE_TIMESTAMP, 
String.valueOf(this.context.getDataTime()));
                     }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
index 00435845..ec7decbd 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.streams.core.exception.DeserializeThrowable;
 import org.apache.rocketmq.streams.core.running.AbstractProcessor;
 import org.apache.rocketmq.streams.core.running.Processor;
 import org.apache.rocketmq.streams.core.window.TimeType;
@@ -40,7 +41,7 @@ public class SourceSupplier<K, V> implements 
Supplier<Processor<V>> {
     }
 
     public interface SourceProcessor<K, V> extends Processor<V> {
-        Pair<K, V> deserialize(String keyClass, String valueClass, byte[] 
data) throws Throwable;
+        Pair<K, V> deserialize(String keyClass, String valueClass, byte[] 
data) throws DeserializeThrowable;
 
         long getTimestamp(MessageExt originData, TimeType timeType);
 
@@ -59,9 +60,13 @@ public class SourceSupplier<K, V> implements 
Supplier<Processor<V>> {
         }
 
         @Override
-        public Pair<K, V> deserialize(String keyClass, String valueClass, 
byte[] data) throws Throwable {
-            this.deserializer.configure(keyClass, valueClass);
-            return this.deserializer.deserialize(data);
+        public Pair<K, V> deserialize(String keyClass, String valueClass, 
byte[] data) throws DeserializeThrowable {
+            try {
+                this.deserializer.configure(keyClass, valueClass);
+                return this.deserializer.deserialize(data);
+            }catch (Throwable t) {
+                throw new DeserializeThrowable(t);
+            }
         }
 
         @Override
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index 99c11211..e340e022 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.SelectAction;
 import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
 import org.apache.rocketmq.streams.core.metadata.Data;
@@ -87,7 +88,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             this.windowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
 
@@ -169,7 +170,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             super.windowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index 9b36d529..d6ca254e 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.AggregateAction;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
@@ -86,7 +87,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             this.windowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
 
@@ -172,7 +173,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
         }
 
         @Override
-        public void preProcess(StreamContext<V> context) throws Throwable {
+        public void preProcess(StreamContext<V> context) throws 
RecoverStateStoreThrowable {
             super.preProcess(context);
             super.windowStore = new WindowStore<>(super.waitStateReplay(), 
WindowState::byte2WindowState, WindowState::windowState2Byte);
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
index f5094992..2b7e0952 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.state.StateStore;
 import org.apache.rocketmq.streams.core.util.Utils;
@@ -42,7 +43,7 @@ public abstract class AbstractProcessor<T> implements 
Processor<T> {
     }
 
     @Override
-    public void preProcess(StreamContext<T> context) throws Throwable {
+    public void preProcess(StreamContext<T> context) throws 
RecoverStateStoreThrowable {
         this.context = context;
         this.context.init(getChildren());
     }
@@ -51,7 +52,7 @@ public abstract class AbstractProcessor<T> implements 
Processor<T> {
         return Collections.unmodifiableList(children);
     }
 
-    protected StateStore waitStateReplay() throws Throwable {
+    protected StateStore waitStateReplay() throws RecoverStateStoreThrowable {
         MessageQueue sourceTopicQueue = new MessageQueue(getSourceTopic(), 
getSourceBrokerName(), getSourceQueueId());
 
         StateStore stateStore = context.getStateStore();
@@ -59,17 +60,11 @@ public abstract class AbstractProcessor<T> implements 
Processor<T> {
         return stateStore;
     }
 
-
-
     @SuppressWarnings("unchecked")
     protected <KEY> Data<KEY, T> convert(Data<?, ?> data) {
         return (Data<KEY, T>) new Data<>(data.getKey(), data.getValue(), 
data.getTimestamp(), data.getHeader());
     }
 
-    @Override
-    public void close() throws Exception {
-
-    }
 
     protected String getSourceBrokerName() {
         String sourceTopicQueue = 
context.getMessageFromWhichSourceTopicQueue();
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
index 21b32a22..ca39c480 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
@@ -26,8 +26,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 public abstract class AbstractWindowProcessor<V> extends AbstractProcessor<V> {
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractWindowProcessor.class.getName());
-
 
     protected List<Window> calculateWindow(WindowInfo windowInfo, long 
valueTime) {
         long sizeInterval = windowInfo.getWindowSize().toMillSecond();
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
index 8b675f61..515dbb40 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
@@ -16,11 +16,13 @@ package org.apache.rocketmq.streams.core.running;
  * limitations under the License.
  */
 
-public interface Processor<T> extends AutoCloseable {
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
+
+public interface Processor<T> {
     void addChild(Processor<T> processor);
 
 
-    void preProcess(StreamContext<T> context) throws Throwable;
+    void preProcess(StreamContext<T> context) throws 
RecoverStateStoreThrowable;
 
     void process(T data) throws Throwable;
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
index c2609714..fc0d5250 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
@@ -17,6 +17,7 @@ package org.apache.rocketmq.streams.core.running;
  */
 
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.streams.core.exception.DataProcessThrowable;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.state.StateStore;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
index c94389d2..1fa86671 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
@@ -25,6 +25,9 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.DataProcessThrowable;
+import org.apache.rocketmq.streams.core.exception.DeserializeThrowable;
+import org.apache.rocketmq.streams.core.exception.RStreamsException;
 import org.apache.rocketmq.streams.core.function.supplier.SourceSupplier;
 import org.apache.rocketmq.streams.core.metadata.Data;
 import org.apache.rocketmq.streams.core.metadata.StreamConfig;
@@ -55,7 +58,8 @@ public class WorkerThread extends Thread {
     private final Properties properties;
 
 
-    public WorkerThread(TopologyBuilder topologyBuilder, Properties 
properties) throws MQClientException {
+    public WorkerThread(String threadName, TopologyBuilder topologyBuilder, 
Properties properties) throws MQClientException {
+        super(threadName);
         this.topologyBuilder = topologyBuilder;
         this.properties = properties;
         String groupName = topologyBuilder.getJobId() + "_" + 
ROCKETMQ_STREAMS_CONSUMER_GROUP;
@@ -87,10 +91,10 @@ public class WorkerThread extends Thread {
 
             this.planetaryEngine.runInLoop();
         } catch (Throwable e) {
-            logger.error("planetaryEngine error.", e);
-            throw new RuntimeException(e);
+            logger.error("worker thread=[{}], error:{}.", this.getName(), e);
+            throw new RStreamsException(e);
         } finally {
-            logger.info("planetaryEngine stop.");
+            logger.info("worker thread=[{}], engin stopped.", this.getName());
             this.planetaryEngine.stop();
         }
     }
@@ -128,7 +132,6 @@ public class WorkerThread extends Thread {
         }
 
 
-        //处理
         void start() throws Throwable {
             createShuffleTopic();
 
@@ -139,68 +142,79 @@ public class WorkerThread extends Thread {
 
         void runInLoop() throws Throwable {
             while (!stop) {
-                List<MessageExt> list = this.unionConsumer.poll(0);
-                if (list.size() == 0) {
-                    Thread.sleep(10);
-                    continue;
-                }
-
                 HashSet<MessageQueue> set = new HashSet<>();
-                for (MessageExt messageExt : list) {
-                    byte[] body = messageExt.getBody();
-                    if (body == null || body.length == 0) {
-                        continue;
-                    }
-
-                    String keyClassName = 
messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
-                    String valueClassName = 
messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
-
-                    String topic = messageExt.getTopic();
-                    int queueId = messageExt.getQueueId();
-                    String brokerName = messageExt.getBrokerName();
-                    MessageQueue queue = new MessageQueue(topic, brokerName, 
queueId);
-                    set.add(queue);
-                    logger.debug("source topic queue:[{}]", queue);
-
-
-                    String key = Utils.buildKey(brokerName, topic, queueId);
-                    SourceSupplier.SourceProcessor<K, V> processor = 
(SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);
-
-                    StreamContextImpl<V> context = new 
StreamContextImpl<>(producer, mqAdmin, stateStore, key);
 
-                    processor.preProcess(context);
-
-                    Pair<K, V> pair = processor.deserialize(keyClassName, 
valueClassName, body);
-
-                    long timestamp;
-                    String userProperty = 
messageExt.getUserProperty(Constant.SOURCE_TIMESTAMP);
-                    if (!StringUtils.isEmpty(userProperty)) {
-                        timestamp = Long.parseLong(userProperty);
-                    } else {
-                        timestamp = processor.getTimestamp(messageExt, 
(TimeType) properties.get(Constant.TIME_TYPE));
+                try {
+                    List<MessageExt> list = this.unionConsumer.poll(10);
+                    for (MessageExt messageExt : list) {
+                        byte[] body = messageExt.getBody();
+                        if (body == null || body.length == 0) {
+                            continue;
+                        }
+
+                        String keyClassName = 
messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
+                        String valueClassName = 
messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
+
+                        String topic = messageExt.getTopic();
+                        int queueId = messageExt.getQueueId();
+                        String brokerName = messageExt.getBrokerName();
+                        MessageQueue queue = new MessageQueue(topic, 
brokerName, queueId);
+                        set.add(queue);
+                        logger.debug("source topic queue:[{}]", queue);
+
+
+                        String key = Utils.buildKey(brokerName, topic, 
queueId);
+                        SourceSupplier.SourceProcessor<K, V> processor = 
(SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);
+
+                        StreamContextImpl<V> context = new 
StreamContextImpl<>(producer, mqAdmin, stateStore, key);
+
+                        processor.preProcess(context);
+
+                        Pair<K, V> pair = processor.deserialize(keyClassName, 
valueClassName, body);
+
+                        long timestamp;
+                        String userProperty = 
messageExt.getUserProperty(Constant.SOURCE_TIMESTAMP);
+                        if (!StringUtils.isEmpty(userProperty)) {
+                            timestamp = Long.parseLong(userProperty);
+                        } else {
+                            timestamp = processor.getTimestamp(messageExt, 
(TimeType) properties.get(Constant.TIME_TYPE));
+                        }
+
+                        String delay = 
properties.getProperty(Constant.ALLOW_LATENESS_MILLISECOND, "0");
+                        long watermark = processor.getWatermark(timestamp, 
Long.parseLong(delay));
+                        context.setWatermark(watermark);
+
+                        Data<K, V> data = new Data<>(pair.getKey(), 
pair.getValue(), timestamp, new Properties());
+                        context.setKey(pair.getKey());
+                        if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
+                            logger.debug("shuffle data: [{}]", data);
+                        } else {
+                            logger.debug("source data: [{}]", data);
+                        }
+
+                        try {
+                            context.forward(data);
+                        } catch (Throwable t) {
+                            logger.error("process error.", t);
+                            throw new DataProcessThrowable(t);
+                        }
                     }
 
-                    String delay = 
properties.getProperty(Constant.ALLOW_LATENESS_MILLISECOND, "0");
-                    long watermark = processor.getWatermark(timestamp, 
Long.parseLong(delay));
-                    context.setWatermark(watermark);
-
-                    Data<K, V> data = new Data<>(pair.getKey(), 
pair.getValue(), timestamp, new Properties());
-                    context.setKey(pair.getKey());
-                    if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
-                        logger.debug("shuffle data: [{}]", data);
+                } catch (Throwable t) {
+                    Object skipDataError = 
properties.get(Constant.SKIP_DATA_ERROR);
+                    if (skipDataError == Boolean.TRUE && t instanceof 
DataProcessThrowable || t instanceof DeserializeThrowable) {
+                        //ignored
                     } else {
-                        logger.debug("source data: [{}]", data);
+                        throw t;
                     }
-                    context.forward(data);
                 }
 
                 //todo 每次都提交位点消耗太大,后面改成拉取消息放入buffer的形式。
                 for (MessageQueue messageQueue : set) {
                     logger.debug("commit messageQueue: [{}]", messageQueue);
                 }
-                this.unionConsumer.commit(set, true);
                 this.stateStore.persist(set);
-                //todo 提交消费位点、写出sink数据、写出状态、需要保持原子
+                this.unionConsumer.commit(set, true);
             }
         }
 
@@ -225,11 +239,12 @@ public class WorkerThread extends Thread {
             this.stop = true;
 
             try {
+                this.stateStore.close();
                 this.unionConsumer.shutdown();
                 this.producer.shutdown();
                 this.mqAdmin.shutdown();
             } catch (Throwable e) {
-                logger.error("error when stop.", e);
+                logger.error("error when stop engin.", e);
             }
         }
     }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
index 0193d158..dbeda9f5 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
@@ -39,7 +39,6 @@ public class KVJsonDeserializer<K, V> extends ShuffleProtocol 
implements KeyValu
         if (!StringUtils.isEmpty(valueClassName)) {
             valueType = (Class<V>) Class.forName(valueClassName);
         }
-
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 3a533eac..44e04c52 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.ValueMapperAction;
 import org.apache.rocketmq.streams.core.metadata.StreamConfig;
 import org.apache.rocketmq.streams.core.window.WindowKey;
@@ -61,7 +62,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
     private final RocksDBStore rocksDBStore;
     private final Properties properties;
 
-    private final ExecutorService executor = Executors.newFixedThreadPool(4);
+    private final ExecutorService executor = Executors.newFixedThreadPool(8);
     private final ShuffleProtocol protocol = new ShuffleProtocol();
 
     private final ConcurrentHashMap<MessageQueue/*messageQueue of state 
topic*/, CountDownLatch2> recoveringQueueMutex = new ConcurrentHashMap<>();
@@ -84,7 +85,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
     }
 
     @Override
-    public void waitIfNotReady(MessageQueue messageQueue) throws Throwable {
+    public void waitIfNotReady(MessageQueue messageQueue) throws 
RecoverStateStoreThrowable {
         MessageQueue stateTopicQueue = 
convertSourceTopicQueue2StateTopicQueue(messageQueue);
         CountDownLatch2 waitPoint = 
this.recoveringQueueMutex.get(stateTopicQueue);
 
@@ -94,6 +95,8 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
             start = System.currentTimeMillis();
             waitPoint.await(5000, TimeUnit.MILLISECONDS);
             end = System.currentTimeMillis();
+        } catch (Throwable t) {
+            throw new RecoverStateStoreThrowable(t);
         } finally {
             long cost = end - start;
             if (cost > 2000) {
@@ -198,6 +201,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
                 try {
                     logger.debug("persist key: " + new String(key, 
StandardCharsets.UTF_8) + ",messageQueue: " + stateTopicQueue);
                 } catch (Throwable t) {
+                    //key is not string, maybe.
                 }
 
                 this.producer.send(message, stateTopicQueue);
@@ -415,6 +419,7 @@ public class RocketMQStore extends AbstractStore implements 
StateStore {
 
     @Override
     public void close() throws Exception {
-
+        this.rocksDBStore.close();
+        this.executor.shutdown();
     }
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
index dc3753d5..1ad4fae0 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
@@ -36,7 +36,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
-public class RocksDBStore extends AbstractStore {
+public class RocksDBStore extends AbstractStore implements AutoCloseable {
     private static final String ROCKSDB_PATH = "/tmp/rocksdb";
     private RocksDB rocksDB;
     private WriteOptions writeOptions;
@@ -154,7 +154,7 @@ public class RocksDBStore extends AbstractStore {
     }
 
     public void close() throws Exception {
-
+        this.rocksDB.close();
     }
 
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index ad967ac9..4a0ce0b7 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -17,6 +17,7 @@ package org.apache.rocketmq.streams.core.state;
  */
 
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
 import org.apache.rocketmq.streams.core.function.ValueMapperAction;
 import org.apache.rocketmq.streams.core.window.WindowKey;
 import org.apache.rocketmq.streams.core.util.Pair;
@@ -40,7 +41,7 @@ public interface StateStore extends AutoCloseable {
      * @throws Throwable
      */
     //如果没准备好,会阻塞
-    void waitIfNotReady(MessageQueue messageQueue) throws Throwable;
+    void waitIfNotReady(MessageQueue messageQueue) throws 
RecoverStateStoreThrowable;
 
 
     byte[] get(byte[] key) throws Throwable;


Reply via email to