This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new 4d5f074 feat:add error handler to runtime.
4d5f074 is described below
commit 4d5f074048505153026a68375cfe7e4d8e0a7ef8
Author: 2011shenlin <[email protected]>
AuthorDate: Fri Apr 14 22:07:24 2023 +0800
feat:add error handler to runtime.
---
.../api/converter/EventTargetConverter.java | 4 +-
.../api/converter/EventTargetDTOConverter.java | 2 +-
.../api/converter/EventTargetConverterTest.java | 4 +-
.../api/converter/EventTargetDTOConverterTest.java | 4 +-
.../rocketmq/repository/RocketMQMetaService.java | 7 +-
.../boot/listener/TargetRunnerContext.java | 46 +++++++++++
.../adapter/runtimer/boot/pusher/PushCallback.java | 39 ++++++++++
.../adapter/runtimer/boot/pusher/PushRequest.java | 34 +++++++++
.../runtimer/boot/transfer/TransformEngine.java | 3 +
.../adapter/runtimer/common/entity/RunOptions.java | 29 +++++++
.../runtimer/common/entity/TargetRunnerConfig.java | 8 ++
.../runtimer/config/RuntimerConfigDefine.java | 5 +-
.../runtimer/retry/DeadLetterQueueService.java | 22 ++++++
.../adapter/runtimer/retry/ErrorHandler.java | 89 ++++++++++++++++++++++
.../adapter/runtimer/retry/EventBusStorage.java | 25 ++++++
.../runtimer/retry/EventBusStorageOnRocketMQ.java | 34 +++++++++
.../eventbridge}/enums/ErrorToleranceEnum.java | 2 +-
.../eventbridge}/enums/PushRetryStrategyEnum.java | 4 +-
.../domain/model/run/RetryStrategy.java | 2 +-
.../eventbridge/domain/model/run/RunOptions.java | 2 +-
20 files changed, 349 insertions(+), 16 deletions(-)
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
index c71d290..3437479 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
@@ -23,8 +23,8 @@ import java.util.stream.Collectors;
import
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
index a0a7c42..aef09a8 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
@@ -23,7 +23,7 @@ import
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
index cd80f95..3b1b45c 100644
---
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
+++
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
@@ -22,8 +22,8 @@ import
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
diff --git
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
index 7662ad6..79445e6 100644
---
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
+++
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.eventbridge.adapter.api.converter;
import java.util.Collections;
import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
index 5987337..bdd586e 100644
---
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
+++
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
@@ -16,6 +16,7 @@
*/
package
org.apache.rocketmq.eventbridge.adapter.persistence.data.rocketmq.repository;
+import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -23,13 +24,13 @@ import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.stereotype.Service;
@@ -47,7 +48,7 @@ public class RocketMQMetaService {
public boolean createTopic(String clusterName, String topicName) {
try {
- defaultMQAdminExt.createTopic(clusterName, topicName,
DEFAULT_QUEUE_NUM);
+ defaultMQAdminExt.createTopic(clusterName, topicName,
DEFAULT_QUEUE_NUM, Maps.newHashMap());
} catch (MQClientException e) {
log.error("Create topic failed.", e);
throw new EventBridgeException(DefaultErrorCode.InternalError, e);
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
new file mode 100644
index 0000000..1eff462
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public class TargetRunnerContext implements TargetRunnerListener {
+
+ public static Map<String, TargetRunnerConfig> targetRunnerConfigMap =
Maps.newHashMap();
+
+ @Override
+ public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
+ @Override
+ public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
+ @Override
+ public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+ }
+
+ public static TargetRunnerConfig getTargetRunnerConfig(String
targetRunnerName) {
+ return targetRunnerConfigMap.get(targetRunnerName);
+ }
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
new file mode 100644
index 0000000..8609cad
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
+
+import com.google.common.collect.Lists;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class PushCallback {
+
+ @Autowired
+ EventSubscriber eventSubscriber;
+
+ public void completed(ConnectRecord connectRecord) {
+ eventSubscriber.commit(Lists.newArrayList(connectRecord));
+ }
+
+ public void completed(List<ConnectRecord> connectRecords) {
+ eventSubscriber.commit(connectRecords);
+ }
+
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
new file mode 100644
index 0000000..9223a5f
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class PushRequest {
+
+ private String targetRunnerName;
+
+ private String targetClass;
+
+ private List<ConnectRecord> connectRecords;
+
+ private PushCallback pushCallback;
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 046d50d..c29fc05 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -23,6 +23,7 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
+import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
@@ -43,6 +44,8 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
private final List<Transform> transformList;
+ private Map<String,List<Transform>> transformListMap;
+
private final KeyValue config;
private final Plugin plugin;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
new file mode 100644
index 0000000..4e44981
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
+
+import lombok.Data;
+
+@Data
+public class RunOptions {
+
+ private String errorsTolerance;
+
+ private String retryStrategy;
+
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index a2eb405..63edbc8 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Objects;
import lombok.Data;
+import static
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
+
/**
* pusher target key config
*/
@@ -36,6 +38,8 @@ public class TargetRunnerConfig implements Serializable {
*/
private List<Map<String, String>> components;
+ private RunOptions runOptions = new RunOptions();
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -99,4 +103,8 @@ public class TargetRunnerConfig implements Serializable {
}
return true;
}
+
+ public String getEventBusName() {
+ return components.get(0).get(TARGET_RUNNER_KEY);
+ }
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 04895cb..332f33d 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -19,7 +19,6 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
import io.openmessaging.connector.api.data.ConnectRecord;
-
import java.util.HashSet;
import java.util.Set;
@@ -98,6 +97,10 @@ public class RuntimerConfigDefine {
public static final String TRANSFORMS = "transforms";
+ public static final String CONNECT_RECORDS_KEY = "SYSTEM_RETRY_TIMES";
+
+ public static final String TARGET_RUNNER_KEY = "eventBusName";
+
/**
* The required key for all configurations.
*/
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
new file mode 100644
index 0000000..ce6a642
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+public class DeadLetterQueueService {
+
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
new file mode 100644
index 0000000..412b538
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import com.google.common.base.Strings;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.CONNECT_RECORDS_KEY;
+import static
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.RUNNER_NAME;
+
+@Slf4j
+public class ErrorHandler {
+
+ @Autowired
+ EventBusStorage eventBusStorage;
+
+ public void handle(ConnectRecord connectRecord, Throwable t) {
+ String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
+ TargetRunnerConfig targetRunnerConfig =
TargetRunnerContext.getTargetRunnerConfig(eventRunnerName);
+ String eventBusName = targetRunnerConfig.getEventBusName();
+ PushRetryStrategyEnum pushRetryStrategyEnum =
PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy());
+ int retryTimes = parseRetryTimes(connectRecord);
+ int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
+ if (delaySec > 0) {
+ eventBusStorage.put(eventBusName, connectRecord, delaySec);
+ }
+ }
+
+ private int parseRetryTimes(ConnectRecord connectRecord) {
+ int retryTimes = 0;
+ String retryTag = connectRecord.getExtension(CONNECT_RECORDS_KEY);
+ if (Strings.isNullOrEmpty(retryTag)) {
+ return retryTimes;
+ }
+ try {
+ retryTimes = Integer.parseInt(retryTag);
+ } catch (Throwable e) {
+ log.warn("parse retry times failed. retryTag={}", retryTag);
+ }
+ return retryTimes;
+ }
+
+ /**
+ * Return right time or -1 (already done)
+ *
+ * @param retryTimes
+ * @param pushRetryStrategyEnum
+ * @return
+ */
+ private int calcDelaySec(int retryTimes, PushRetryStrategyEnum
pushRetryStrategyEnum) {
+
+ switch (pushRetryStrategyEnum) {
+ case BACKOFF_RETRY:
+ if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {
+ return -1;
+ }
+ return 10;
+ case EXPONENTIAL_DECAY_RETRY:
+ if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {
+ return -1;
+ }
+ int pow = (int) Math.pow(2, 3 + retryTimes);
+ return (pow > 512 ? 512 : pow);
+ default:
+ return -1;
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
new file mode 100644
index 0000000..cbec332
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+
+public interface EventBusStorage {
+
+ void put(String eventBusName, ConnectRecord connectRecord, int delaySec);
+}
\ No newline at end of file
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
new file mode 100644
index 0000000..94d8d4a
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+
+public class EventBusStorageOnRocketMQ implements EventBusStorage {
+
+ @Override
+ public void put(String eventBusName, ConnectRecord connectRecord, int
delaySec) {
+
+ }
+
+ public List<String> parseEventBusName(String eventBusName) {
+ //TODO
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
similarity index 95%
rename from
domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
rename to
common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
index f115a69..01f8c24 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.domain.common.enums;
+package org.apache.rocketmq.eventbridge.enums;
import com.google.common.base.Strings;
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
similarity index 94%
rename from
domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
rename to
common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 314b224..913b0ab 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.domain.common.enums;
+package org.apache.rocketmq.eventbridge.enums;
import com.google.common.base.Strings;
@@ -27,7 +27,7 @@ public enum PushRetryStrategyEnum {
/**
* 176 times: 8s~16s~32s~64s~128s~256s~512s ... 512s(170)
*/
- EXPONENTIAL_DECAY_RETRY(2, 10);
+ EXPONENTIAL_DECAY_RETRY(2, 170);
private int code;
private int retryTimes;
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
index 72dbe70..84f813d 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run;
import lombok.Builder;
import lombok.Data;
-import
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
@Builder
@Data
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
index a040d32..60c3a73 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run;
import lombok.Builder;
import lombok.Data;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
@Builder
@Data