This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 710933f6 add exception
new 2664360e Merge pull request #242 from ni-ze/newWorld-4
710933f6 is described below
commit 710933f6281488315a12de706708842e5687162f
Author: 维章 <[email protected]>
AuthorDate: Wed Jan 4 10:59:09 2023 +0800
add exception
---
.../rocketmq/streams/core/RocketMQStream.java | 6 +-
.../rocketmq/streams/core/common/Constant.java | 14 +--
.../DataProcessThrowable.java} | 22 +++-
.../DeserializeThrowable.java} | 24 +++-
.../RStreamsException.java} | 24 +++-
.../RecoverStateStoreThrowable.java} | 22 +++-
.../function/supplier/AccumulatorSupplier.java | 3 +-
.../core/function/supplier/AggregateSupplier.java | 3 +-
.../function/supplier/JoinAggregateSupplier.java | 3 +-
.../supplier/JoinWindowAggregateSupplier.java | 3 +-
.../core/function/supplier/SinkSupplier.java | 6 +-
.../core/function/supplier/SourceSupplier.java | 13 ++-
.../supplier/WindowAccumulatorSupplier.java | 5 +-
.../function/supplier/WindowAggregateSupplier.java | 5 +-
.../streams/core/running/AbstractProcessor.java | 11 +-
.../core/running/AbstractWindowProcessor.java | 2 -
.../rocketmq/streams/core/running/Processor.java | 6 +-
.../streams/core/running/StreamContextImpl.java | 1 +
.../streams/core/running/WorkerThread.java | 125 ++++++++++++---------
.../serialization/deImpl/KVJsonDeserializer.java | 1 -
.../rocketmq/streams/core/state/RocketMQStore.java | 11 +-
.../rocketmq/streams/core/state/RocksDBStore.java | 4 +-
.../rocketmq/streams/core/state/StateStore.java | 3 +-
23 files changed, 196 insertions(+), 121 deletions(-)
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
b/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
index 34642000..e616964c 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/RocketMQStream.java
@@ -16,6 +16,7 @@ package org.apache.rocketmq.streams.core;
* limitations under the License.
*/
+import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.running.WorkerThread;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
@@ -48,7 +49,10 @@ public class RocketMQStream {
try {
int threadNum = StreamConfig.STREAMS_PARALLEL_THREAD_NUM;
for (int i = 0; i < threadNum; i++) {
- WorkerThread thread = new WorkerThread(topologyBuilder,
this.properties);
+ String threadName = String.join("_",
Constant.WORKER_THREAD_NAME, String.valueOf(i));
+
+ WorkerThread thread = new WorkerThread(threadName,
topologyBuilder, this.properties);
+
thread.start();
workerThreads.add(thread);
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
b/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
index c3839331..f90d3dc5 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java
@@ -28,9 +28,10 @@ public class Constant {
public static final String SHUFFLE_TOPIC_SUFFIX = "-shuffleTopic";
-
public static final String TIME_TYPE = "timeType";
+ public static final String SKIP_DATA_ERROR = "skip_data_error";
+
public static final String ALLOW_LATENESS_MILLISECOND =
"allowLatenessMillisecond";
public static final String SPLIT = "@";
@@ -43,17 +44,10 @@ public class Constant {
public static final String STREAM_TAG = "stream_tag";
- public static final String WINDOW_FOR_JOIN = "window_for_join";
-
- public static final String WINDOW_JOIN_TYPE = "window_join_type";
-
- public static final String JOIN_STREAM_SIDE = "join_stream_side";
- public static final String JOIN_COMMON_SHUFFLE_TOPIC =
"join_common_shuffle_topic";
-
- public static final String COMMON_NAME_MAKER = "common_name_maker";
-
public static final String WINDOW_START_TIME = "window_start_time";
public static final String WINDOW_END_TIME = "window_end_time";
+ public static final String WORKER_THREAD_NAME = "worker_thread";
+
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
similarity index 56%
copy from
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to
core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
index 8b675f61..db96cfec 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DataProcessThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,12 +14,25 @@ package org.apache.rocketmq.streams.core.running;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.streams.core.exception;
-public interface Processor<T> extends AutoCloseable {
- void addChild(Processor<T> processor);
+public class DataProcessThrowable extends Throwable {
+ public DataProcessThrowable() {
+ }
+ public DataProcessThrowable(String message) {
+ super(message);
+ }
- void preProcess(StreamContext<T> context) throws Throwable;
+ public DataProcessThrowable(String message, Throwable cause) {
+ super(message, cause);
+ }
- void process(T data) throws Throwable;
+ public DataProcessThrowable(Throwable cause) {
+ super(cause);
+ }
+
+ public DataProcessThrowable(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
similarity index 54%
copy from
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to
core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
index 8b675f61..4a14f506 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/DeserializeThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,12 +14,27 @@ package org.apache.rocketmq.streams.core.running;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.streams.core.exception;
-public interface Processor<T> extends AutoCloseable {
- void addChild(Processor<T> processor);
+public class DeserializeThrowable extends Throwable {
+ private static final long serialVersionUID = 2154421351264920776L;
+ public DeserializeThrowable() {
+ }
- void preProcess(StreamContext<T> context) throws Throwable;
+ public DeserializeThrowable(String message) {
+ super(message);
+ }
- void process(T data) throws Throwable;
+ public DeserializeThrowable(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DeserializeThrowable(Throwable cause) {
+ super(cause);
+ }
+
+ public DeserializeThrowable(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
similarity index 54%
copy from
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to
core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
index 8b675f61..dc361c22 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RStreamsException.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,12 +14,27 @@ package org.apache.rocketmq.streams.core.running;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.streams.core.exception;
-public interface Processor<T> extends AutoCloseable {
- void addChild(Processor<T> processor);
+public class RStreamsException extends RuntimeException {
+ private static final long serialVersionUID = 6729806497659471678L;
+ public RStreamsException() {
+ }
- void preProcess(StreamContext<T> context) throws Throwable;
+ public RStreamsException(String message) {
+ super(message);
+ }
- void process(T data) throws Throwable;
+ public RStreamsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RStreamsException(Throwable cause) {
+ super(cause);
+ }
+
+ public RStreamsException(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
similarity index 55%
copy from
core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
copy to
core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
index 8b675f61..27478555 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/exception/RecoverStateStoreThrowable.java
@@ -1,4 +1,3 @@
-package org.apache.rocketmq.streams.core.running;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,12 +14,25 @@ package org.apache.rocketmq.streams.core.running;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.streams.core.exception;
-public interface Processor<T> extends AutoCloseable {
- void addChild(Processor<T> processor);
+public class RecoverStateStoreThrowable extends Throwable {
+ public RecoverStateStoreThrowable() {
+ }
+ public RecoverStateStoreThrowable(String message) {
+ super(message);
+ }
- void preProcess(StreamContext<T> context) throws Throwable;
+ public RecoverStateStoreThrowable(String message, Throwable cause) {
+ super(message, cause);
+ }
- void process(T data) throws Throwable;
+ public RecoverStateStoreThrowable(Throwable cause) {
+ super(cause);
+ }
+
+ public RecoverStateStoreThrowable(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
index 5a6f3df6..a91c754d 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.metadata.Data;
@@ -62,7 +63,7 @@ public class AccumulatorSupplier<K, V, R, OV> implements
Supplier<Processor<V>>
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
this.stateStore = super.waitStateReplay();
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
index 99c9e5ac..0bb30e5a 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.AggregateAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
@@ -63,7 +64,7 @@ public class AggregateSupplier<K, V, OV> implements
Supplier<Processor<V>> {
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
this.stateStore = super.waitStateReplay();
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
index 8958012d..60fe3644 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
@@ -66,7 +67,7 @@ public class JoinAggregateSupplier<K, V1, V2, OUT> implements
Supplier<Processor
}
@Override
- public void preProcess(StreamContext<Object> context) throws Throwable
{
+ public void preProcess(StreamContext<Object> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
this.stateStore = super.waitStateReplay();
String stateTopicName = getSourceTopic() +
Constant.STATE_TOPIC_SUFFIX;
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
index 363c8bfd..9c7cb312 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
@@ -79,7 +80,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT>
implements Supplier<Pro
}
@Override
- public void preProcess(StreamContext<Object> context) throws Throwable
{
+ public void preProcess(StreamContext<Object> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
leftWindowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
rightWindowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index cccfff94..46f2eeea 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -63,15 +63,14 @@ public class SinkSupplier<K, T> implements
Supplier<Processor<T>> {
@Override
public void process(T data) throws Throwable {
if (data != null) {
- Message message;
-
- //todo 异常体系,哪些可以不必中断线程,哪些是需要中断的?
byte[] value = this.serializer.serialize(key, data);
if (value == null) {
//目前RocketMQ不支持发送body为null的消息;
return;
}
+ Message message;
+
if (this.key == null) {
message = new Message(this.topicName, value);
@@ -91,7 +90,6 @@ public class SinkSupplier<K, T> implements
Supplier<Processor<T>> {
message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME,
this.key.getClass().getName());
message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME,
data.getClass().getName());
- //todo 丑陋
if
(this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
message.putUserProperty(Constant.SOURCE_TIMESTAMP,
String.valueOf(this.context.getDataTime()));
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
index 00435845..ec7decbd 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SourceSupplier.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.streams.core.exception.DeserializeThrowable;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.window.TimeType;
@@ -40,7 +41,7 @@ public class SourceSupplier<K, V> implements
Supplier<Processor<V>> {
}
public interface SourceProcessor<K, V> extends Processor<V> {
- Pair<K, V> deserialize(String keyClass, String valueClass, byte[]
data) throws Throwable;
+ Pair<K, V> deserialize(String keyClass, String valueClass, byte[]
data) throws DeserializeThrowable;
long getTimestamp(MessageExt originData, TimeType timeType);
@@ -59,9 +60,13 @@ public class SourceSupplier<K, V> implements
Supplier<Processor<V>> {
}
@Override
- public Pair<K, V> deserialize(String keyClass, String valueClass,
byte[] data) throws Throwable {
- this.deserializer.configure(keyClass, valueClass);
- return this.deserializer.deserialize(data);
+ public Pair<K, V> deserialize(String keyClass, String valueClass,
byte[] data) throws DeserializeThrowable {
+ try {
+ this.deserializer.configure(keyClass, valueClass);
+ return this.deserializer.deserialize(data);
+ }catch (Throwable t) {
+ throw new DeserializeThrowable(t);
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index 99c11211..e340e022 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.metadata.Data;
@@ -87,7 +88,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
this.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
@@ -169,7 +170,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
super.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index 9b36d529..d6ca254e 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.AggregateAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
@@ -86,7 +87,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
this.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
@@ -172,7 +173,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
}
@Override
- public void preProcess(StreamContext<V> context) throws Throwable {
+ public void preProcess(StreamContext<V> context) throws
RecoverStateStoreThrowable {
super.preProcess(context);
super.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState, WindowState::windowState2Byte);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
index f5094992..2b7e0952 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.streams.core.util.Utils;
@@ -42,7 +43,7 @@ public abstract class AbstractProcessor<T> implements
Processor<T> {
}
@Override
- public void preProcess(StreamContext<T> context) throws Throwable {
+ public void preProcess(StreamContext<T> context) throws
RecoverStateStoreThrowable {
this.context = context;
this.context.init(getChildren());
}
@@ -51,7 +52,7 @@ public abstract class AbstractProcessor<T> implements
Processor<T> {
return Collections.unmodifiableList(children);
}
- protected StateStore waitStateReplay() throws Throwable {
+ protected StateStore waitStateReplay() throws RecoverStateStoreThrowable {
MessageQueue sourceTopicQueue = new MessageQueue(getSourceTopic(),
getSourceBrokerName(), getSourceQueueId());
StateStore stateStore = context.getStateStore();
@@ -59,17 +60,11 @@ public abstract class AbstractProcessor<T> implements
Processor<T> {
return stateStore;
}
-
-
@SuppressWarnings("unchecked")
protected <KEY> Data<KEY, T> convert(Data<?, ?> data) {
return (Data<KEY, T>) new Data<>(data.getKey(), data.getValue(),
data.getTimestamp(), data.getHeader());
}
- @Override
- public void close() throws Exception {
-
- }
protected String getSourceBrokerName() {
String sourceTopicQueue =
context.getMessageFromWhichSourceTopicQueue();
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
index 21b32a22..ca39c480 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractWindowProcessor.java
@@ -26,8 +26,6 @@ import java.util.ArrayList;
import java.util.List;
public abstract class AbstractWindowProcessor<V> extends AbstractProcessor<V> {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractWindowProcessor.class.getName());
-
protected List<Window> calculateWindow(WindowInfo windowInfo, long
valueTime) {
long sizeInterval = windowInfo.getWindowSize().toMillSecond();
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
index 8b675f61..515dbb40 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/running/Processor.java
@@ -16,11 +16,13 @@ package org.apache.rocketmq.streams.core.running;
* limitations under the License.
*/
-public interface Processor<T> extends AutoCloseable {
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
+
+public interface Processor<T> {
void addChild(Processor<T> processor);
- void preProcess(StreamContext<T> context) throws Throwable;
+ void preProcess(StreamContext<T> context) throws
RecoverStateStoreThrowable;
void process(T data) throws Throwable;
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
index c2609714..fc0d5250 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/StreamContextImpl.java
@@ -17,6 +17,7 @@ package org.apache.rocketmq.streams.core.running;
*/
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.streams.core.exception.DataProcessThrowable;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
index c94389d2..1fa86671 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
@@ -25,6 +25,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.DataProcessThrowable;
+import org.apache.rocketmq.streams.core.exception.DeserializeThrowable;
+import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.function.supplier.SourceSupplier;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
@@ -55,7 +58,8 @@ public class WorkerThread extends Thread {
private final Properties properties;
- public WorkerThread(TopologyBuilder topologyBuilder, Properties
properties) throws MQClientException {
+ public WorkerThread(String threadName, TopologyBuilder topologyBuilder,
Properties properties) throws MQClientException {
+ super(threadName);
this.topologyBuilder = topologyBuilder;
this.properties = properties;
String groupName = topologyBuilder.getJobId() + "_" +
ROCKETMQ_STREAMS_CONSUMER_GROUP;
@@ -87,10 +91,10 @@ public class WorkerThread extends Thread {
this.planetaryEngine.runInLoop();
} catch (Throwable e) {
- logger.error("planetaryEngine error.", e);
- throw new RuntimeException(e);
+ logger.error("worker thread=[{}], error:{}.", this.getName(), e);
+ throw new RStreamsException(e);
} finally {
- logger.info("planetaryEngine stop.");
+ logger.info("worker thread=[{}], engin stopped.", this.getName());
this.planetaryEngine.stop();
}
}
@@ -128,7 +132,6 @@ public class WorkerThread extends Thread {
}
- //处理
void start() throws Throwable {
createShuffleTopic();
@@ -139,68 +142,79 @@ public class WorkerThread extends Thread {
void runInLoop() throws Throwable {
while (!stop) {
- List<MessageExt> list = this.unionConsumer.poll(0);
- if (list.size() == 0) {
- Thread.sleep(10);
- continue;
- }
-
HashSet<MessageQueue> set = new HashSet<>();
- for (MessageExt messageExt : list) {
- byte[] body = messageExt.getBody();
- if (body == null || body.length == 0) {
- continue;
- }
-
- String keyClassName =
messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
- String valueClassName =
messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
-
- String topic = messageExt.getTopic();
- int queueId = messageExt.getQueueId();
- String brokerName = messageExt.getBrokerName();
- MessageQueue queue = new MessageQueue(topic, brokerName,
queueId);
- set.add(queue);
- logger.debug("source topic queue:[{}]", queue);
-
-
- String key = Utils.buildKey(brokerName, topic, queueId);
- SourceSupplier.SourceProcessor<K, V> processor =
(SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);
-
- StreamContextImpl<V> context = new
StreamContextImpl<>(producer, mqAdmin, stateStore, key);
- processor.preProcess(context);
-
- Pair<K, V> pair = processor.deserialize(keyClassName,
valueClassName, body);
-
- long timestamp;
- String userProperty =
messageExt.getUserProperty(Constant.SOURCE_TIMESTAMP);
- if (!StringUtils.isEmpty(userProperty)) {
- timestamp = Long.parseLong(userProperty);
- } else {
- timestamp = processor.getTimestamp(messageExt,
(TimeType) properties.get(Constant.TIME_TYPE));
+ try {
+ List<MessageExt> list = this.unionConsumer.poll(10);
+ for (MessageExt messageExt : list) {
+ byte[] body = messageExt.getBody();
+ if (body == null || body.length == 0) {
+ continue;
+ }
+
+ String keyClassName =
messageExt.getUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME);
+ String valueClassName =
messageExt.getUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME);
+
+ String topic = messageExt.getTopic();
+ int queueId = messageExt.getQueueId();
+ String brokerName = messageExt.getBrokerName();
+ MessageQueue queue = new MessageQueue(topic,
brokerName, queueId);
+ set.add(queue);
+ logger.debug("source topic queue:[{}]", queue);
+
+
+ String key = Utils.buildKey(brokerName, topic,
queueId);
+ SourceSupplier.SourceProcessor<K, V> processor =
(SourceSupplier.SourceProcessor<K, V>) wrapper.selectProcessor(key);
+
+ StreamContextImpl<V> context = new
StreamContextImpl<>(producer, mqAdmin, stateStore, key);
+
+ processor.preProcess(context);
+
+ Pair<K, V> pair = processor.deserialize(keyClassName,
valueClassName, body);
+
+ long timestamp;
+ String userProperty =
messageExt.getUserProperty(Constant.SOURCE_TIMESTAMP);
+ if (!StringUtils.isEmpty(userProperty)) {
+ timestamp = Long.parseLong(userProperty);
+ } else {
+ timestamp = processor.getTimestamp(messageExt,
(TimeType) properties.get(Constant.TIME_TYPE));
+ }
+
+ String delay =
properties.getProperty(Constant.ALLOW_LATENESS_MILLISECOND, "0");
+ long watermark = processor.getWatermark(timestamp,
Long.parseLong(delay));
+ context.setWatermark(watermark);
+
+ Data<K, V> data = new Data<>(pair.getKey(),
pair.getValue(), timestamp, new Properties());
+ context.setKey(pair.getKey());
+ if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
+ logger.debug("shuffle data: [{}]", data);
+ } else {
+ logger.debug("source data: [{}]", data);
+ }
+
+ try {
+ context.forward(data);
+ } catch (Throwable t) {
+ logger.error("process error.", t);
+ throw new DataProcessThrowable(t);
+ }
}
- String delay =
properties.getProperty(Constant.ALLOW_LATENESS_MILLISECOND, "0");
- long watermark = processor.getWatermark(timestamp,
Long.parseLong(delay));
- context.setWatermark(watermark);
-
- Data<K, V> data = new Data<>(pair.getKey(),
pair.getValue(), timestamp, new Properties());
- context.setKey(pair.getKey());
- if (topic.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
- logger.debug("shuffle data: [{}]", data);
+ } catch (Throwable t) {
+ Object skipDataError =
properties.get(Constant.SKIP_DATA_ERROR);
+ if (skipDataError == Boolean.TRUE && t instanceof
DataProcessThrowable || t instanceof DeserializeThrowable) {
+ //ignored
} else {
- logger.debug("source data: [{}]", data);
+ throw t;
}
- context.forward(data);
}
//todo 每次都提交位点消耗太大,后面改成拉取消息放入buffer的形式。
for (MessageQueue messageQueue : set) {
logger.debug("commit messageQueue: [{}]", messageQueue);
}
- this.unionConsumer.commit(set, true);
this.stateStore.persist(set);
- //todo 提交消费位点、写出sink数据、写出状态、需要保持原子
+ this.unionConsumer.commit(set, true);
}
}
@@ -225,11 +239,12 @@ public class WorkerThread extends Thread {
this.stop = true;
try {
+ this.stateStore.close();
this.unionConsumer.shutdown();
this.producer.shutdown();
this.mqAdmin.shutdown();
} catch (Throwable e) {
- logger.error("error when stop.", e);
+ logger.error("error when stop engin.", e);
}
}
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
b/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
index 0193d158..dbeda9f5 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/serialization/deImpl/KVJsonDeserializer.java
@@ -39,7 +39,6 @@ public class KVJsonDeserializer<K, V> extends ShuffleProtocol
implements KeyValu
if (!StringUtils.isEmpty(valueClassName)) {
valueType = (Class<V>) Class.forName(valueClassName);
}
-
}
@Override
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 3a533eac..44e04c52 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.streams.core.common.Constant;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.window.WindowKey;
@@ -61,7 +62,7 @@ public class RocketMQStore extends AbstractStore implements
StateStore {
private final RocksDBStore rocksDBStore;
private final Properties properties;
- private final ExecutorService executor = Executors.newFixedThreadPool(4);
+ private final ExecutorService executor = Executors.newFixedThreadPool(8);
private final ShuffleProtocol protocol = new ShuffleProtocol();
private final ConcurrentHashMap<MessageQueue/*messageQueue of state
topic*/, CountDownLatch2> recoveringQueueMutex = new ConcurrentHashMap<>();
@@ -84,7 +85,7 @@ public class RocketMQStore extends AbstractStore implements
StateStore {
}
@Override
- public void waitIfNotReady(MessageQueue messageQueue) throws Throwable {
+ public void waitIfNotReady(MessageQueue messageQueue) throws
RecoverStateStoreThrowable {
MessageQueue stateTopicQueue =
convertSourceTopicQueue2StateTopicQueue(messageQueue);
CountDownLatch2 waitPoint =
this.recoveringQueueMutex.get(stateTopicQueue);
@@ -94,6 +95,8 @@ public class RocketMQStore extends AbstractStore implements
StateStore {
start = System.currentTimeMillis();
waitPoint.await(5000, TimeUnit.MILLISECONDS);
end = System.currentTimeMillis();
+ } catch (Throwable t) {
+ throw new RecoverStateStoreThrowable(t);
} finally {
long cost = end - start;
if (cost > 2000) {
@@ -198,6 +201,7 @@ public class RocketMQStore extends AbstractStore implements
StateStore {
try {
logger.debug("persist key: " + new String(key,
StandardCharsets.UTF_8) + ",messageQueue: " + stateTopicQueue);
} catch (Throwable t) {
+ //key is not string, maybe.
}
this.producer.send(message, stateTopicQueue);
@@ -415,6 +419,7 @@ public class RocketMQStore extends AbstractStore implements
StateStore {
@Override
public void close() throws Exception {
-
+ this.rocksDBStore.close();
+ this.executor.shutdown();
}
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
index dc3753d5..1ad4fae0 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocksDBStore.java
@@ -36,7 +36,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-public class RocksDBStore extends AbstractStore {
+public class RocksDBStore extends AbstractStore implements AutoCloseable {
private static final String ROCKSDB_PATH = "/tmp/rocksdb";
private RocksDB rocksDB;
private WriteOptions writeOptions;
@@ -154,7 +154,7 @@ public class RocksDBStore extends AbstractStore {
}
public void close() throws Exception {
-
+ this.rocksDB.close();
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index ad967ac9..4a0ce0b7 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -17,6 +17,7 @@ package org.apache.rocketmq.streams.core.state;
*/
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.util.Pair;
@@ -40,7 +41,7 @@ public interface StateStore extends AutoCloseable {
* @throws Throwable
*/
//如果没准备好,会阻塞
- void waitIfNotReady(MessageQueue messageQueue) throws Throwable;
+ void waitIfNotReady(MessageQueue messageQueue) throws
RecoverStateStoreThrowable;
byte[] get(byte[] key) throws Throwable;