This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/exception-handler
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 4a0fcbe9d73e239538b19cd61eda2a8d24eab901
Author: 2011shenlin <[email protected]>
AuthorDate: Fri Apr 14 22:07:24 2023 +0800

    feat:add error handler to runtime.
---
 .../api/converter/EventTargetConverter.java        |  4 +-
 .../api/converter/EventTargetDTOConverter.java     |  2 +-
 .../api/converter/EventTargetConverterTest.java    |  4 +-
 .../api/converter/EventTargetDTOConverterTest.java |  4 +-
 .../rocketmq/repository/RocketMQMetaService.java   |  7 +-
 .../boot/listener/TargetRunnerContext.java         | 46 +++++++++++
 .../adapter/runtimer/boot/pusher/PushCallback.java | 39 ++++++++++
 .../adapter/runtimer/boot/pusher/PushRequest.java  | 34 +++++++++
 .../runtimer/boot/transfer/TransformEngine.java    |  3 +
 .../adapter/runtimer/common/entity/RunOptions.java | 29 +++++++
 .../runtimer/common/entity/TargetRunnerConfig.java |  8 ++
 .../runtimer/config/RuntimerConfigDefine.java      |  5 +-
 .../runtimer/retry/DeadLetterQueueService.java     | 22 ++++++
 .../adapter/runtimer/retry/ErrorHandler.java       | 89 ++++++++++++++++++++++
 .../adapter/runtimer/retry/EventBusStorage.java    | 25 ++++++
 .../runtimer/retry/EventBusStorageOnRocketMQ.java  | 34 +++++++++
 .../eventbridge}/enums/ErrorToleranceEnum.java     |  2 +-
 .../eventbridge}/enums/PushRetryStrategyEnum.java  |  4 +-
 .../domain/model/run/RetryStrategy.java            |  2 +-
 .../eventbridge/domain/model/run/RunOptions.java   |  2 +-
 20 files changed, 349 insertions(+), 16 deletions(-)

diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
index c71d290..3437479 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java
@@ -23,8 +23,8 @@ import java.util.stream.Collectors;
 import 
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import 
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
 import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
index a0a7c42..aef09a8 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java
@@ -23,7 +23,7 @@ import 
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
 import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
 import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
index cd80f95..3b1b45c 100644
--- 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
+++ 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java
@@ -22,8 +22,8 @@ import 
org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import 
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
 import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
 import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget;
diff --git 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
index 7662ad6..79445e6 100644
--- 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
+++ 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.eventbridge.adapter.api.converter;
 
 import java.util.Collections;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
-import 
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue;
 import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
 import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
diff --git 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
index 5987337..bdd586e 100644
--- 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
+++ 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java
@@ -16,6 +16,7 @@
  */
 package 
org.apache.rocketmq.eventbridge.adapter.persistence.data.rocketmq.repository;
 
+import com.google.common.collect.Maps;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -23,13 +24,13 @@ import java.util.Set;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.springframework.stereotype.Service;
 
@@ -47,7 +48,7 @@ public class RocketMQMetaService {
 
     public boolean createTopic(String clusterName, String topicName) {
         try {
-            defaultMQAdminExt.createTopic(clusterName, topicName, 
DEFAULT_QUEUE_NUM);
+            defaultMQAdminExt.createTopic(clusterName, topicName, 
DEFAULT_QUEUE_NUM, Maps.newHashMap());
         } catch (MQClientException e) {
             log.error("Create topic failed.", e);
             throw new EventBridgeException(DefaultErrorCode.InternalError, e);
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
new file mode 100644
index 0000000..1eff462
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+
+public class TargetRunnerContext implements TargetRunnerListener {
+
+    public static Map<String, TargetRunnerConfig> targetRunnerConfigMap = 
Maps.newHashMap();
+
+    @Override
+    public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
+    @Override
+    public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
+    @Override
+    public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) {
+
+    }
+
+    public static TargetRunnerConfig getTargetRunnerConfig(String 
targetRunnerName) {
+        return targetRunnerConfigMap.get(targetRunnerName);
+    }
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
new file mode 100644
index 0000000..8609cad
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
+
+import com.google.common.collect.Lists;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+
+public class PushCallback {
+
+    @Autowired
+    EventSubscriber eventSubscriber;
+
+    public void completed(ConnectRecord connectRecord) {
+        eventSubscriber.commit(Lists.newArrayList(connectRecord));
+    }
+
+    public void completed(List<ConnectRecord> connectRecords) {
+        eventSubscriber.commit(connectRecords);
+    }
+
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
new file mode 100644
index 0000000..9223a5f
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class PushRequest {
+
+    private String targetRunnerName;
+
+    private String targetClass;
+
+    private List<ConnectRecord> connectRecords;
+
+    private PushCallback pushCallback;
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 046d50d..c29fc05 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -23,6 +23,7 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
@@ -43,6 +44,8 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     private final List<Transform> transformList;
 
+    private Map<String,List<Transform>> transformListMap;
+
     private final KeyValue config;
 
     private final Plugin plugin;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
new file mode 100644
index 0000000..4e44981
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
+
+import lombok.Data;
+
+@Data
+public class RunOptions {
+
+    private String errorsTolerance;
+
+    private String retryStrategy;
+
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index a2eb405..63edbc8 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Objects;
 import lombok.Data;
 
+import static 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
+
 /**
  * pusher target key config
  */
@@ -36,6 +38,8 @@ public class TargetRunnerConfig implements Serializable {
      */
     private List<Map<String, String>> components;
 
+    private RunOptions runOptions = new RunOptions();
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -99,4 +103,8 @@ public class TargetRunnerConfig implements Serializable {
         }
         return true;
     }
+
+    public String getEventBusName() {
+        return components.get(0).get(TARGET_RUNNER_KEY);
+    }
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 04895cb..332f33d 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -19,7 +19,6 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
-
 import java.util.HashSet;
 import java.util.Set;
 
@@ -98,6 +97,10 @@ public class RuntimerConfigDefine {
 
     public static final String TRANSFORMS = "transforms";
 
+    public static final String CONNECT_RECORDS_KEY = "SYSTEM_RETRY_TIMES";
+
+    public static final String TARGET_RUNNER_KEY = "eventBusName";
+
     /**
      * The required key for all configurations.
      */
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
new file mode 100644
index 0000000..ce6a642
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+public class DeadLetterQueueService {
+
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
new file mode 100644
index 0000000..412b538
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import com.google.common.base.Strings;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.CONNECT_RECORDS_KEY;
+import static 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.RUNNER_NAME;
+
+@Slf4j
+public class ErrorHandler {
+
+    @Autowired
+    EventBusStorage eventBusStorage;
+
+    public void handle(ConnectRecord connectRecord, Throwable t) {
+        String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
+        TargetRunnerConfig targetRunnerConfig = 
TargetRunnerContext.getTargetRunnerConfig(eventRunnerName);
+        String eventBusName = targetRunnerConfig.getEventBusName();
+        PushRetryStrategyEnum pushRetryStrategyEnum = 
PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy());
+        int retryTimes = parseRetryTimes(connectRecord);
+        int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
+        if (delaySec > 0) {
+            eventBusStorage.put(eventBusName, connectRecord, delaySec);
+        }
+    }
+
+    private int parseRetryTimes(ConnectRecord connectRecord) {
+        int retryTimes = 0;
+        String retryTag = connectRecord.getExtension(CONNECT_RECORDS_KEY);
+        if (Strings.isNullOrEmpty(retryTag)) {
+            return retryTimes;
+        }
+        try {
+            retryTimes = Integer.parseInt(retryTag);
+        } catch (Throwable e) {
+            log.warn("parse retry times failed. retryTag={}", retryTag);
+        }
+        return retryTimes;
+    }
+
+    /**
+     * Return right time or -1 (already done)
+     *
+     * @param retryTimes
+     * @param pushRetryStrategyEnum
+     * @return
+     */
+    private int calcDelaySec(int retryTimes, PushRetryStrategyEnum 
pushRetryStrategyEnum) {
+
+        switch (pushRetryStrategyEnum) {
+            case BACKOFF_RETRY:
+                if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {
+                    return -1;
+                }
+                return 10;
+            case EXPONENTIAL_DECAY_RETRY:
+                if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) {
+                    return -1;
+                }
+                int pow = (int) Math.pow(2, 3 + retryTimes);
+                return (pow > 512 ? 512 : pow);
+            default:
+                return -1;
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
new file mode 100644
index 0000000..cbec332
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+
+public interface EventBusStorage {
+
+    void put(String eventBusName, ConnectRecord connectRecord, int delaySec);
+}
\ No newline at end of file
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
new file mode 100644
index 0000000..94d8d4a
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.retry;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.List;
+
+public class EventBusStorageOnRocketMQ implements EventBusStorage {
+
+    @Override
+    public void put(String eventBusName, ConnectRecord connectRecord, int 
delaySec) {
+
+    }
+
+    public List<String> parseEventBusName(String eventBusName) {
+        //TODO
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
similarity index 95%
rename from 
domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
rename to 
common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
index f115a69..01f8c24 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.eventbridge.domain.common.enums;
+package org.apache.rocketmq.eventbridge.enums;
 
 import com.google.common.base.Strings;
 
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
similarity index 94%
rename from 
domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
rename to 
common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 314b224..913b0ab 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.domain.common.enums;
+package org.apache.rocketmq.eventbridge.enums;
 
 import com.google.common.base.Strings;
 
@@ -27,7 +27,7 @@ public enum PushRetryStrategyEnum {
     /**
      * 176 times: 8s~16s~32s~64s~128s~256s~512s ... 512s(170)
      */
-    EXPONENTIAL_DECAY_RETRY(2, 10);
+    EXPONENTIAL_DECAY_RETRY(2, 170);
 
     private int code;
     private int retryTimes;
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
index 72dbe70..84f813d 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run;
 
 import lombok.Builder;
 import lombok.Data;
-import 
org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 
 @Builder
 @Data
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
index a040d32..60c3a73 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run;
 
 import lombok.Builder;
 import lombok.Data;
-import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
 
 @Builder
 @Data

Reply via email to