This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/exception-handler in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 4a0fcbe9d73e239538b19cd61eda2a8d24eab901 Author: 2011shenlin <[email protected]> AuthorDate: Fri Apr 14 22:07:24 2023 +0800 feat:add error handler to runtime. --- .../api/converter/EventTargetConverter.java | 4 +- .../api/converter/EventTargetDTOConverter.java | 2 +- .../api/converter/EventTargetConverterTest.java | 4 +- .../api/converter/EventTargetDTOConverterTest.java | 4 +- .../rocketmq/repository/RocketMQMetaService.java | 7 +- .../boot/listener/TargetRunnerContext.java | 46 +++++++++++ .../adapter/runtimer/boot/pusher/PushCallback.java | 39 ++++++++++ .../adapter/runtimer/boot/pusher/PushRequest.java | 34 +++++++++ .../runtimer/boot/transfer/TransformEngine.java | 3 + .../adapter/runtimer/common/entity/RunOptions.java | 29 +++++++ .../runtimer/common/entity/TargetRunnerConfig.java | 8 ++ .../runtimer/config/RuntimerConfigDefine.java | 5 +- .../runtimer/retry/DeadLetterQueueService.java | 22 ++++++ .../adapter/runtimer/retry/ErrorHandler.java | 89 ++++++++++++++++++++++ .../adapter/runtimer/retry/EventBusStorage.java | 25 ++++++ .../runtimer/retry/EventBusStorageOnRocketMQ.java | 34 +++++++++ .../eventbridge}/enums/ErrorToleranceEnum.java | 2 +- .../eventbridge}/enums/PushRetryStrategyEnum.java | 4 +- .../domain/model/run/RetryStrategy.java | 2 +- .../eventbridge/domain/model/run/RunOptions.java | 2 +- 20 files changed, 349 insertions(+), 16 deletions(-) diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java index c71d290..3437479 100644 --- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java +++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverter.java @@ -23,8 +23,8 @@ import java.util.stream.Collectors; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO; -import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum; -import org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java index a0a7c42..aef09a8 100644 --- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java +++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverter.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO; -import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; diff --git a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java index cd80f95..3b1b45c 100644 --- a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java +++ b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java @@ -22,8 +22,8 @@ import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO; -import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum; -import org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget; diff --git a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java index 7662ad6..79445e6 100644 --- a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java +++ b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.eventbridge.adapter.api.converter; import java.util.Collections; import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; -import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum; -import org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java index 5987337..bdd586e 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.eventbridge.adapter.persistence.data.rocketmq.repository; +import com.google.common.collect.Maps; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -23,13 +24,13 @@ import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.springframework.stereotype.Service; @@ -47,7 +48,7 @@ public class RocketMQMetaService { public boolean createTopic(String clusterName, String topicName) { try { - defaultMQAdminExt.createTopic(clusterName, topicName, DEFAULT_QUEUE_NUM); + defaultMQAdminExt.createTopic(clusterName, topicName, DEFAULT_QUEUE_NUM, Maps.newHashMap()); } catch (MQClientException e) { log.error("Create topic failed.", e); throw new EventBridgeException(DefaultErrorCode.InternalError, e); diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java new file mode 100644 index 0000000..1eff462 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; + +public class TargetRunnerContext implements TargetRunnerListener { + + public static Map<String, TargetRunnerConfig> targetRunnerConfigMap = Maps.newHashMap(); + + @Override + public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + + } + + @Override + public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + + } + + @Override + public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + + } + + public static TargetRunnerConfig getTargetRunnerConfig(String targetRunnerName) { + return targetRunnerConfigMap.get(targetRunnerName); + } +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java new file mode 100644 index 0000000..8609cad --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushCallback.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher; + +import com.google.common.collect.Lists; +import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; +import org.springframework.beans.factory.annotation.Autowired; + +public class PushCallback { + + @Autowired + EventSubscriber eventSubscriber; + + public void completed(ConnectRecord connectRecord) { + eventSubscriber.commit(Lists.newArrayList(connectRecord)); + } + + public void completed(List<ConnectRecord> connectRecords) { + eventSubscriber.commit(connectRecords); + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java new file mode 100644 index 0000000..9223a5f --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/pusher/PushRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher; + +import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; +import lombok.Data; + +@Data +public class PushRequest { + + private String targetRunnerName; + + private String targetClass; + + private List<ConnectRecord> connectRecords; + + private PushCallback pushCallback; +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java index 046d50d..c29fc05 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java @@ -23,6 +23,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.Transform; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.internal.DefaultKeyValue; +import java.util.Map; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName; @@ -43,6 +44,8 @@ public class TransformEngine<R extends ConnectRecord> implements AutoCloseable { private final List<Transform> transformList; + private Map<String,List<Transform>> transformListMap; + private final KeyValue config; private final Plugin plugin; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java new file mode 100644 index 0000000..4e44981 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity; + +import lombok.Data; + +@Data +public class RunOptions { + + private String errorsTolerance; + + private String retryStrategy; + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java index a2eb405..63edbc8 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.Objects; import lombok.Data; +import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY; + /** * pusher target key config */ @@ -36,6 +38,8 @@ public class TargetRunnerConfig implements Serializable { */ private List<Map<String, String>> components; + private RunOptions runOptions = new RunOptions(); + @Override public boolean equals(Object o) { if (this == o) @@ -99,4 +103,8 @@ public class TargetRunnerConfig implements Serializable { } return true; } + + public String getEventBusName() { + return components.get(0).get(TARGET_RUNNER_KEY); + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java index 04895cb..332f33d 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.config; import io.openmessaging.connector.api.data.ConnectRecord; - import java.util.HashSet; import java.util.Set; @@ -98,6 +97,10 @@ public class RuntimerConfigDefine { public static final String TRANSFORMS = "transforms"; + public static final String CONNECT_RECORDS_KEY = "SYSTEM_RETRY_TIMES"; + + public static final String TARGET_RUNNER_KEY = "eventBusName"; + /** * The required key for all configurations. */ diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java new file mode 100644 index 0000000..ce6a642 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; + +public class DeadLetterQueueService { + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java new file mode 100644 index 0000000..412b538 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; + +import com.google.common.base.Strings; +import io.openmessaging.connector.api.data.ConnectRecord; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.CONNECT_RECORDS_KEY; +import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.RUNNER_NAME; + +@Slf4j +public class ErrorHandler { + + @Autowired + EventBusStorage eventBusStorage; + + public void handle(ConnectRecord connectRecord, Throwable t) { + String eventRunnerName = connectRecord.getExtension(RUNNER_NAME); + TargetRunnerConfig targetRunnerConfig = TargetRunnerContext.getTargetRunnerConfig(eventRunnerName); + String eventBusName = targetRunnerConfig.getEventBusName(); + PushRetryStrategyEnum pushRetryStrategyEnum = PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy()); + int retryTimes = parseRetryTimes(connectRecord); + int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum); + if (delaySec > 0) { + eventBusStorage.put(eventBusName, connectRecord, delaySec); + } + } + + private int parseRetryTimes(ConnectRecord connectRecord) { + int retryTimes = 0; + String retryTag = connectRecord.getExtension(CONNECT_RECORDS_KEY); + if (Strings.isNullOrEmpty(retryTag)) { + return retryTimes; + } + try { + retryTimes = Integer.parseInt(retryTag); + } catch (Throwable e) { + log.warn("parse retry times failed. retryTag={}", retryTag); + } + return retryTimes; + } + + /** + * Return right time or -1 (already done) + * + * @param retryTimes + * @param pushRetryStrategyEnum + * @return + */ + private int calcDelaySec(int retryTimes, PushRetryStrategyEnum pushRetryStrategyEnum) { + + switch (pushRetryStrategyEnum) { + case BACKOFF_RETRY: + if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) { + return -1; + } + return 10; + case EXPONENTIAL_DECAY_RETRY: + if (retryTimes >= pushRetryStrategyEnum.getRetryTimes()) { + return -1; + } + int pow = (int) Math.pow(2, 3 + retryTimes); + return (pow > 512 ? 512 : pow); + default: + return -1; + } + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java new file mode 100644 index 0000000..cbec332 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; + +import io.openmessaging.connector.api.data.ConnectRecord; + +public interface EventBusStorage { + + void put(String eventBusName, ConnectRecord connectRecord, int delaySec); +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java new file mode 100644 index 0000000..94d8d4a --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; + +import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; + +public class EventBusStorageOnRocketMQ implements EventBusStorage { + + @Override + public void put(String eventBusName, ConnectRecord connectRecord, int delaySec) { + + } + + public List<String> parseEventBusName(String eventBusName) { + //TODO + return null; + } +} \ No newline at end of file diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java similarity index 95% rename from domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java rename to common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java index f115a69..01f8c24 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/ErrorToleranceEnum.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/ErrorToleranceEnum.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.domain.common.enums; +package org.apache.rocketmq.eventbridge.enums; import com.google.common.base.Strings; diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java similarity index 94% rename from domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java rename to common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java index 314b224..913b0ab 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/PushRetryStrategyEnum.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.domain.common.enums; +package org.apache.rocketmq.eventbridge.enums; import com.google.common.base.Strings; @@ -27,7 +27,7 @@ public enum PushRetryStrategyEnum { /** * 176 times: 8s~16s~32s~64s~128s~256s~512s ... 512s(170) */ - EXPONENTIAL_DECAY_RETRY(2, 10); + EXPONENTIAL_DECAY_RETRY(2, 170); private int code; private int retryTimes; diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java index 72dbe70..84f813d 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java +++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RetryStrategy.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run; import lombok.Builder; import lombok.Data; -import org.apache.rocketmq.eventbridge.domain.common.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; @Builder @Data diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java index a040d32..60c3a73 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java +++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/run/RunOptions.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.eventbridge.domain.model.run; import lombok.Builder; import lombok.Data; -import org.apache.rocketmq.eventbridge.domain.common.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; @Builder @Data
