This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/main by this push:
new 6fc28a0 fix: code style issues to make compile pass with checkstyle
plugin enabled (#160)
6fc28a0 is described below
commit 6fc28a05f92f912085ed5ffa48eb2d9c80e0c53a
Author: Zhanhui Li <[email protected]>
AuthorDate: Sun Feb 25 18:57:46 2024 +0800
fix: code style issues to make compile pass with checkstyle plugin enabled
(#160)
Signed-off-by: Li Zhanhui <[email protected]>
---
.../adapter/benchmark/AbstractEventCommon.java | 6 +-
.../adapter/benchmark/EventTPSCommon.java | 16 ++---
.../adapter/persistence/DatasourceConfig.java | 5 +-
.../MybatisEventTargetRunnerRepository.java | 1 -
.../impl/runtime/RuntimeTargetRunnerAPIImpl.java | 28 ++++----
.../eventbridge/adapter/runtime/Runtime.java | 11 ++-
.../adapter/runtime/boot/EventBusListener.java | 6 +-
.../adapter/runtime/boot/EventRuleTransfer.java | 16 ++---
.../adapter/runtime/boot/EventTargetTrigger.java | 18 +++--
.../runtime/boot/common/CirculatorContext.java | 82 +++++++++++++---------
.../runtime/boot/listener/EventSubscriber.java | 8 ++-
.../runtime/boot/transfer/TransformEngine.java | 17 +++--
.../runtime/boot/trigger/TriggerTaskContext.java | 32 ++++-----
.../adapter/runtime/common/LoggerName.java | 8 +--
.../adapter/runtime/common/ServiceThread.java | 16 ++---
.../runtime/common/entity/TargetKeyValue.java | 14 ++--
.../runtime/common/entity/TargetRunnerConfig.java | 18 ++---
.../adapter/runtime/common/plugin/Plugin.java | 12 +++-
.../adapter/runtime/common/plugin/PluginUtils.java | 11 ++-
.../common/store/FileBaseKeyValueStore.java | 2 +-
.../adapter/runtime/config/RuntimeConfigProps.java | 17 +++--
.../runtime/config/RuntimeConfiguration.java | 1 -
.../adapter/runtime/converter/JsonConverter.java | 2 +-
.../adapter/runtime/converter/ListConverter.java | 2 +-
.../runtime/converter/RecordOffsetConverter.java | 2 +-
.../adapter/runtime/error/ErrorHandler.java | 2 +-
.../AbstractTargetRunnerConfigObserver.java | 2 +-
.../service/TargetRunnerConfigOnFileObserver.java | 1 -
.../adapter/runtime/utils/ExceptionUtil.java | 4 +-
.../adapter/runtime/utils/ShutdownUtils.java | 6 +-
.../rocketmq/runtimer/RocketMQEventSubscriber.java | 61 +++++++++-------
.../rocketmq/runtimer/consumer/ClientConfig.java | 7 +-
.../rocketmq/runtimer/consumer/ConsumeRequest.java | 4 --
.../runtimer/consumer/LitePullConsumer.java | 4 --
.../runtimer/consumer/LitePullConsumerImpl.java | 40 +++++------
.../runtimer/consumer/LocalMessageCache.java | 4 --
.../eventbridge/enums/PushRetryStrategyEnum.java | 3 +-
.../infrastructure/trace/TraceStrategy.java | 5 +-
.../eventbridge/filter/ValidateFilter.java | 16 ++---
39 files changed, 258 insertions(+), 252 deletions(-)
diff --git
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
index ed6a91c..33b59fa 100644
---
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
+++
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/AbstractEventCommon.java
@@ -48,19 +48,19 @@ public abstract class AbstractEventCommon {
return;
}
- // tps: 每秒文件打印的行数
+ // tps: rows to print for each second
final long tps = currentRowCount - previousRowCount.get();
previousRowCount.set(currentRowCount);
writeCount.add(currentRowCount);
costTime.add(1000);
- // delayTime(条/ms)=接收的数量/花费的时间
+ // delayTime(record/ms)= receiving-amount / time
final double delayTime = writeCount.longValue() / costTime.longValue();
// String delayTimeStr = twoDecimal(delayTime);
String info = String.format("Current Time: %s | TPS: %d ",
UtilAll.timeMillisToHumanString2(System.currentTimeMillis()),
tps);
- System.out.println(info);
+ System.out.printf("%s%n", info);
}
diff --git
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
index e9acb87..8f91eb8 100644
---
a/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
+++
b/adapter/benchmark/src/main/java/org/apache/rocketmq/eventbridge/adapter/benchmark/EventTPSCommon.java
@@ -16,18 +16,20 @@
*/
package org.apache.rocketmq.eventbridge.adapter.benchmark;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.*;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
- * 整条链路
+ * End-to-End use case
*/
public class EventTPSCommon extends AbstractEventCommon {
public static void main(String[] args) {
@@ -35,12 +37,10 @@ public class EventTPSCommon extends AbstractEventCommon {
if (args.length > 0) {
filePath = args[0];
}
- EventTPSCommon tpsCommon = null;
+ EventTPSCommon tpsCommon;
try {
tpsCommon = new EventTPSCommon(filePath);
tpsCommon.start();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
@@ -56,7 +56,7 @@ public class EventTPSCommon extends AbstractEventCommon {
previousRowCount = new AtomicReference<>();
previousRowCount.set(0);
executorService = new ScheduledThreadPoolExecutor(1,
- new
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
+ new
BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-all-%d").build());
}
@Override
diff --git
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
index 6718b8d..8418830 100644
---
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
+++
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/DatasourceConfig.java
@@ -75,7 +75,7 @@ public class DatasourceConfig {
private Long validationTimeoutMs;
@Bean("dataSource")
- public DataSource getMasterDataSource(){
+ public DataSource getMasterDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(baseUrl);
hikariConfig.setDriverClassName(baseDriverClassName);
@@ -102,7 +102,8 @@ public class DatasourceConfig {
}
@Bean("sqlSessionTemplate")
- public SqlSessionTemplate
masterSqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory
sqlSessionFactory){
+ public SqlSessionTemplate masterSqlSessionTemplate(
+ @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
diff --git
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
index 97bfb88..adab7ec 100644
---
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
+++
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.converter.EventTargetRunnerConverter;
import
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.dataobject.EventTargetRunnerDO;
import
org.apache.rocketmq.eventbridge.adapter.persistence.target.mybatis.mapper.EventTargetRunnerMapper;
diff --git
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index 77c6bc9..a0ebc6e 100644
---
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -52,19 +52,21 @@ public class RuntimeTargetRunnerAPIImpl implements
TargetRunnerAPI {
targetRunnerConfig.setName(name);
List<Map<String, String>> components = Lists.newArrayList();
targetRunnerConfig.setComponents(components);
- Map<String, String> sourceComponent = new Gson().fromJson(new
Gson().toJson(source
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
- Map<String, String> filterComponent = new Gson().fromJson(new
Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern)
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
-
- Map<String, String> transformComponent = new Gson().fromJson(new
Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform)
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
- Map<String, String> targetComponent = new Gson().fromJson(new
Gson().toJson(target
- .getConfig()), new TypeToken<Map<String, String>>() {
- }.getType());
+
+ Map<String, String> sourceComponent = new Gson().fromJson(
+ new Gson().toJson(source.getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> filterComponent = new Gson().fromJson(
+ new
Gson().toJson(RocketMQConverter.buildEventBridgeFilterTransform(filterPattern).getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> transformComponent = new Gson().fromJson(new
Gson().toJson(RocketMQConverter.buildEventBridgeTransform(targetTransform).getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
+ Map<String, String> targetComponent = new Gson().fromJson(new
Gson().toJson(target.getConfig()),
+ new TypeToken<Map<String, String>>() {}.getType());
+
components.add(sourceComponent);
components.add(filterComponent);
components.add(transformComponent);
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index af8bbbd..ba3ed1f 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -34,19 +34,16 @@ import org.springframework.stereotype.Component;
import
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.StartAndShutdown;
import
org.apache.rocketmq.eventbridge.adapter.runtime.boot.hook.AbstractStartAndShutdown;
-
import javax.annotation.PostConstruct;
import java.util.concurrent.atomic.AtomicReference;
/**
* event bridge runtime
- *
- * @author artisan
*/
@Component
public class Runtime {
- private static final Logger logger =
LoggerFactory.getLogger(Runtime.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Runtime.class);
private AtomicReference<RuntimeState> runtimerState;
@@ -65,7 +62,7 @@ public class Runtime {
@PostConstruct
public void initAndStart() throws Exception {
- logger.info("Start init runtime.");
+ LOGGER.info("Start init runtime.");
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
runnerConfigObserver.registerListener(circulatorContext);
runnerConfigObserver.registerListener(eventSubscriber);
@@ -80,11 +77,11 @@ public class Runtime {
RUNTIME_START_AND_SHUTDOWN.start();
java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- logger.info("try to shutdown server");
+ LOGGER.info("try to shutdown server");
try {
RUNTIME_START_AND_SHUTDOWN.shutdown();
} catch (Exception e) {
- logger.error("err when shutdown runtime ", e);
+ LOGGER.error("err when shutdown runtime ", e);
}
}));
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f..b14b551 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -30,12 +30,10 @@ import org.slf4j.LoggerFactory;
/**
* listen the event and offer to queue
- *
- * @author artisan
*/
public class EventBusListener extends ServiceThread {
- private static final Logger logger =
LoggerFactory.getLogger(EventBusListener.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EventBusListener.class);
private final CirculatorContext circulatorContext;
private final EventSubscriber eventSubscriber;
@@ -60,7 +58,7 @@ public class EventBusListener extends ServiceThread {
}
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
- logger.error(getServiceName() + " - event bus pull record
exception, stackTrace - ", exception);
+ LOGGER.error(getServiceName() + " - event bus pull record
exception, stackTrace - ", exception);
pullRecordList.forEach(pullRecord ->
errorHandler.handle(pullRecord, exception));
}
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index f24ed1b..bc671db 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
*/
public class EventRuleTransfer extends ServiceThread {
- private static final Logger logger =
LoggerFactory.getLogger(EventRuleTransfer.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EventRuleTransfer.class);
private volatile Integer batchSize = 100;
@@ -68,18 +68,18 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
- List<ConnectRecord> afterTransformConnect = new
CopyOnWriteArrayList<>();;
+ List<ConnectRecord> afterTransformConnect = new
CopyOnWriteArrayList<>();
while (!stopped) {
try {
Map<String, List<ConnectRecord>> eventRecordMap =
circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
- logger.trace("listen eventRecords is empty, continue by
curTime - {}", System.currentTimeMillis());
+ LOGGER.trace("listen eventRecords is empty, continue by
curTime - {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
Map<String, TransformEngine<ConnectRecord>> latestTransformMap
= circulatorContext.getTaskTransformMap();
if (MapUtils.isEmpty(latestTransformMap)) {
- logger.warn("latest transform engine is empty, continue by
curTime - {}", System.currentTimeMillis());
+ LOGGER.warn("latest transform engine is empty, continue by
curTime - {}", System.currentTimeMillis());
this.waitForRunning(3000);
continue;
}
@@ -92,7 +92,7 @@ public class EventRuleTransfer extends ServiceThread {
curEventRecords.forEach(pullRecord -> {
CompletableFuture<Void> transformFuture =
CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
- logger.error("transfer do transform event
record failed,stackTrace-", exception);
+ LOGGER.error("transfer do transform event
record failed, stackTrace-", exception);
errorHandler.handle(pullRecord, exception);
return null;
})
@@ -108,9 +108,9 @@ public class EventRuleTransfer extends ServiceThread {
}
CompletableFuture.allOf(completableFutures.toArray(new
CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
- logger.info("offer target task queues succeed, transforms -
{}", JSON.toJSONString(afterTransformConnect));
+ LOGGER.info("offer target task queues succeed, transforms -
{}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
- logger.error("transfer event record failed, stackTrace-",
exception);
+ LOGGER.error("transfer event record failed, stackTrace-",
exception);
afterTransformConnect.forEach(transferRecord ->
errorHandler.handle(transferRecord, exception));
}
@@ -127,7 +127,7 @@ public class EventRuleTransfer extends ServiceThread {
try {
circulatorContext.releaseTaskTransform();
} catch (Exception e) {
- logger.error(String.format("current thread: %s, error Track: %s ",
getServiceName(), ExceptionUtil.getErrorMessage(e)));
+ LOGGER.error(String.format("current thread: %s, error Track: %s ",
getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b..b9d175b 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -36,12 +36,10 @@ import org.slf4j.LoggerFactory;
/**
* event target push to sink task
- *
- * @author artisan
*/
public class EventTargetTrigger extends ServiceThread {
- private static final Logger logger =
LoggerFactory.getLogger(EventTargetTrigger.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EventTargetTrigger.class);
private final CirculatorContext circulatorContext;
private final OffsetManager offsetManager;
@@ -49,7 +47,7 @@ public class EventTargetTrigger extends ServiceThread {
private volatile Integer batchSize = 100;
public EventTargetTrigger(CirculatorContext circulatorContext,
OffsetManager offsetManager,
- ErrorHandler errorHandler) {
+ ErrorHandler errorHandler) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
@@ -60,15 +58,15 @@ public class EventTargetTrigger extends ServiceThread {
while (!stopped) {
Map<String, List<ConnectRecord>> targetRecordMap =
circulatorContext.takeTargetRecords(batchSize);
if (MapUtils.isEmpty(targetRecordMap)) {
- logger.trace("current target pusher is empty");
+ LOGGER.trace("current target pusher is empty");
this.waitForRunning(1000);
continue;
}
- if (logger.isDebugEnabled()) {
- logger.debug("start push content by pusher - {}",
JSON.toJSONString(targetRecordMap));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("start push content by pusher - {}",
JSON.toJSONString(targetRecordMap));
}
- for(String runnerName: targetRecordMap.keySet()){
+ for (String runnerName : targetRecordMap.keySet()) {
ExecutorService executorService =
circulatorContext.getExecutorService(runnerName);
executorService.execute(() -> {
SinkTask sinkTask =
circulatorContext.getPusherTaskMap().get(runnerName);
@@ -77,7 +75,7 @@ public class EventTargetTrigger extends ServiceThread {
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
} catch (Exception exception) {
- logger.error(getServiceName() + " push target
exception, stackTrace-", exception);
+ LOGGER.error(getServiceName() + " push target
exception, stackTrace-", exception);
triggerRecords.forEach(triggerRecord ->
errorHandler.handle(triggerRecord, exception));
}
});
@@ -101,7 +99,7 @@ public class EventTargetTrigger extends ServiceThread {
circulatorContext.releaseExecutorService();
circulatorContext.releaseTriggerTask();
} catch (Exception e) {
- logger.error(String.format("current thread: %s, error Track: %s ",
getServiceName(), ExceptionUtil.getErrorMessage(e)));
+ LOGGER.error(String.format("current thread: %s, error Track: %s ",
getServiceName(), ExceptionUtil.getErrorMessage(e)));
}
}
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 2763be3..3ea21fa 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -21,6 +21,12 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtime.boot.trigger.TriggerTaskContext;
@@ -41,40 +47,40 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.*;
/**
- * event circulator context for listener, transfer and trigger
+ * Event circulatory context for listener, transfer and trigger
*/
@Component
public class CirculatorContext implements TargetRunnerListener {
- private final static Logger logger =
LoggerFactory.getLogger(LoggerName.EventBus_Listener);
+ private final static Logger LOGGER =
LoggerFactory.getLogger(LoggerName.EVENT_BUS_LISTENER);
@Autowired
private Plugin plugin;
- private static Integer QUEUE_CAPACITY = 50000;
+ private static final Integer QUEUE_CAPACITY = 50000;
- private BlockingQueue<ConnectRecord> eventQueue = new
LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ConnectRecord> eventQueue = new
LinkedBlockingQueue<>(50000);
- private BlockingQueue<ConnectRecord> targetQueue = new
LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ConnectRecord> targetQueue = new
LinkedBlockingQueue<>(50000);
- private Map<String/*RunnerName*/, TargetRunnerConfig> runnerConfigMap =
new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, TargetRunnerConfig>
runnerConfigMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>>
eventQueueMap = new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>>
eventQueueMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>>
targetQueueMap = new ConcurrentHashMap<>(30);
+ private final Map<String/*RunnerName*/, BlockingQueue<ConnectRecord>>
targetQueueMap = new ConcurrentHashMap<>(30);
- private Map<String/*RunnerName*/, TransformEngine<ConnectRecord>>
taskTransformMap = new ConcurrentHashMap<>(20);
+ private final Map<String/*RunnerName*/, TransformEngine<ConnectRecord>>
taskTransformMap = new ConcurrentHashMap<>(20);
- private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new
ConcurrentHashMap<>(20);
+ private final Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new
ConcurrentHashMap<>(20);
- private Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap = new
ConcurrentHashMap<>(10);
+ private final Map<String/*RunnerName*/, ExecutorService> pusherExecutorMap
= new ConcurrentHashMap<>(10);
/**
* initial targetRunnerMap, taskTransformMap, pusherTaskMap
- * @param targetRunnerConfigs
+ *
+ * @param targetRunnerConfigs Configurations for the target runner
*/
public void initCirculatorContext(Set<TargetRunnerConfig>
targetRunnerConfigs) {
if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
@@ -102,10 +108,11 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* get target runner config by runner name
+ *
* @param runnerName
* @return
*/
- public TargetRunnerConfig getRunnerConfig(String runnerName){
+ public TargetRunnerConfig getRunnerConfig(String runnerName) {
return runnerConfigMap.get(runnerName);
}
@@ -122,21 +129,23 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* update record queue map
+ *
* @param recordMap
* @param eventQueueMap
*/
- private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>>
recordMap, Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
- try{
- for(String runnerName : recordMap.keySet()){
+ private boolean updateRecordQueueMap(Map<String, List<ConnectRecord>>
recordMap,
+ Map<String, BlockingQueue<ConnectRecord>> eventQueueMap) {
+ try {
+ for (String runnerName : recordMap.keySet()) {
BlockingQueue<ConnectRecord> recordQueue =
eventQueueMap.get(runnerName);
- if(CollectionUtils.isEmpty(recordQueue)){
+ if (CollectionUtils.isEmpty(recordQueue)) {
recordQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}
recordQueue.addAll(recordMap.get(runnerName));
eventQueueMap.put(runnerName, recordQueue);
}
return true;
- }catch (Exception exception){
+ } catch (Exception exception) {
return false;
}
}
@@ -147,7 +156,7 @@ public class CirculatorContext implements
TargetRunnerListener {
* @return
*/
public Map<String, List<ConnectRecord>> takeEventRecords(int batchSize) {
- if(eventQueue.isEmpty()){
+ if (eventQueue.isEmpty()) {
return null;
}
List<ConnectRecord> eventRecords = Lists.newArrayList();
@@ -171,11 +180,12 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* take batch target records
+ *
* @param batchSize
* @return
*/
public Map<String, List<ConnectRecord>> takeTargetRecords(Integer
batchSize) {
- if(targetQueue.isEmpty()){
+ if (targetQueue.isEmpty()) {
return null;
}
List<ConnectRecord> targetRecords = Lists.newArrayList();
@@ -185,6 +195,7 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* user runner-name as key
+ *
* @param eventRecords
* @return
*/
@@ -193,7 +204,7 @@ public class CirculatorContext implements
TargetRunnerListener {
for (ConnectRecord connectRecord : eventRecords) {
String runnerName =
connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
List<ConnectRecord> curEventRecords =
eventRecordMap.get(runnerName);
- if(CollectionUtils.isEmpty(curEventRecords)){
+ if (CollectionUtils.isEmpty(curEventRecords)) {
curEventRecords = Lists.newArrayList();
}
curEventRecords.add(connectRecord);
@@ -204,15 +215,17 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* get specific thread pool by push name
+ *
* @param runnerName
* @return
*/
- public ExecutorService getExecutorService(String runnerName){
+ public ExecutorService getExecutorService(String runnerName) {
return pusherExecutorMap.get(runnerName);
}
/**
* refresh target runner where config changed
+ *
* @param targetRunnerConfig
* @param refreshTypeEnum
*/
@@ -225,7 +238,7 @@ public class CirculatorContext implements
TargetRunnerListener {
TransformEngine<ConnectRecord> transformChain = new
TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
taskTransformMap.put(runnerName, transformChain);
- int endIndex = targetRunnerConfig.getComponents().size() -1;
+ int endIndex = targetRunnerConfig.getComponents().size() - 1;
TargetKeyValue targetKeyValue = new
TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
pusherTaskMap.put(runnerName, sinkTask);
@@ -234,16 +247,16 @@ public class CirculatorContext implements
TargetRunnerListener {
pusherExecutorMap.put(runnerName,
initDefaultThreadPoolExecutor(runnerName));
}
- if(logger.isInfoEnabled()){
- logger.info("runnerName -{}- refresh context by refresh
type -{}- succeed", runnerName, refreshTypeEnum.name());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("runnerName -{}- refresh context by refresh
type -{}- succeed", runnerName, refreshTypeEnum.name());
}
break;
case DELETE:
runnerConfigMap.remove(runnerName);
taskTransformMap.remove(runnerName);
pusherTaskMap.remove(runnerName);
- if(logger.isInfoEnabled()){
- logger.info("runnerName -{}- remove context succeed",
runnerName);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("runnerName -{}- remove context succeed",
runnerName);
}
break;
default:
@@ -253,16 +266,18 @@ public class CirculatorContext implements
TargetRunnerListener {
/**
* init default thread poll param, support auto config
+ *
* @param threadPollName
* @return
*/
private ExecutorService initDefaultThreadPoolExecutor(String
threadPollName) {
return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(300),
ThreadUtils.newThreadFactory(threadPollName, false));
+ new LinkedBlockingQueue<>(300),
ThreadUtils.newThreadFactory(threadPollName, false));
}
/**
* init target sink task
+ *
* @param targetKeyValue
* @return
*/
@@ -286,13 +301,12 @@ public class CirculatorContext implements
TargetRunnerListener {
Plugin.compareAndSwapLoaders(loader);
}
return sinkTask;
- }catch (Exception exception) {
- logger.error("task class -" + taskClass + "- init its sinkTask
failed, ex- ", exception);
+ } catch (Exception exception) {
+ LOGGER.error("task class -" + taskClass + "- init its sinkTask
failed, ex- ", exception);
}
return null;
}
-
public void releaseTaskTransform() throws Exception {
for (Map.Entry<String, TransformEngine<ConnectRecord>> taskTransform :
taskTransformMap.entrySet()) {
String runnerName = taskTransform.getKey();
@@ -303,7 +317,7 @@ public class CirculatorContext implements
TargetRunnerListener {
}
public void releaseTriggerTask() {
- for (Map.Entry<String, SinkTask> triggerTask:
pusherTaskMap.entrySet()) {
+ for (Map.Entry<String, SinkTask> triggerTask :
pusherTaskMap.entrySet()) {
SinkTask sinkTask = triggerTask.getValue();
String runnerName = triggerTask.getKey();
sinkTask.stop();
@@ -312,7 +326,7 @@ public class CirculatorContext implements
TargetRunnerListener {
}
public void releaseExecutorService() throws Exception {
- for (Map.Entry<String, ExecutorService> pusherExecutor:
pusherExecutorMap.entrySet()) {
+ for (Map.Entry<String, ExecutorService> pusherExecutor :
pusherExecutorMap.entrySet()) {
ExecutorService pusher = pusherExecutor.getValue();
ShutdownUtils.shutdownThreadPool(pusher);
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index be4db9b..dc18ab7 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -29,6 +29,7 @@ public abstract class EventSubscriber implements
TargetRunnerListener {
/**
* Refresh subscriber inner data when runner keys changed
+ *
* @param subscribeRunnerKeys
* @param refreshTypeEnum
*/
@@ -42,7 +43,7 @@ public abstract class EventSubscriber implements
TargetRunnerListener {
public abstract List<ConnectRecord> pull();
/**
- * Commit the connect records.
+ * Commit connect records.
*
* @param connectRecordList
*/
@@ -54,12 +55,13 @@ public abstract class EventSubscriber implements
TargetRunnerListener {
public abstract void close();
/**
- * Put the connect record to the eventbus.
+ * Put connect record to the eventbus.
+ *
* @param eventBusName
* @param connectRecord
* @param delaySec
*/
- public boolean put(String eventBusName, ConnectRecord connectRecord, int
delaySec){
+ public boolean put(String eventBusName, ConnectRecord connectRecord, int
delaySec) {
// convert the eventBusName to Topic ?
return true;
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
index f9e879a..5737892 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/transfer/TransformEngine.java
@@ -37,7 +37,7 @@ import java.util.Objects;
public class TransformEngine<R extends ConnectRecord> implements AutoCloseable
{
- private static final Logger logger =
LoggerFactory.getLogger(LoggerName.EventRule_Transfer);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.EVENT_RULE_TRANSFER);
private final List<Transform> transformList;
@@ -76,18 +76,19 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
transform.init(transformConfig);
this.transformList.add(transform);
} catch (Exception e) {
- logger.error("transform new instance error", e);
+ LOGGER.error("transform new instance error", e);
}
}
}
/**
* format listener and pusher key
+ *
* @param components
* @return
*/
private TargetKeyValue formatTargetKey(List<Map<String, String>>
components) {
- if(CollectionUtils.isEmpty(components)){
+ if (CollectionUtils.isEmpty(components)) {
return null;
}
int startIndex = 0;
@@ -101,6 +102,7 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
/**
* transform event record for target record
+ *
* @param connectRecord
* @return
*/
@@ -120,10 +122,11 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
/**
* get task config value by key
+ *
* @param configKey
* @return
*/
- public String getConnectConfig(String configKey){
+ public String getConnectConfig(String configKey) {
return config.getString(configKey);
}
@@ -149,8 +152,10 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
TransformEngine<?> that = (TransformEngine<?>) o;
return transformList.equals(that.transformList) &&
config.equals(that.config);
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
index 6af6382..6edb06d 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java
@@ -43,7 +43,7 @@ public class TriggerTaskContext implements SinkTaskContext {
*/
private final TargetKeyValue taskConfig;
- private static final Logger logger =
LoggerFactory.getLogger(LoggerName.EventTarget_Trigger);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.EVENT_TARGET_TRIGGER);
private final Map<MessageQueue, Long> messageQueuesOffsetMap = new
ConcurrentHashMap<>(64);
@@ -61,20 +61,20 @@ public class TriggerTaskContext implements SinkTaskContext {
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset
recordOffset) {
if (null == recordPartition || null == recordPartition.getPartition()
|| null == recordOffset || null == recordOffset.getOffset()) {
- logger.warn("recordPartition {} info is null or recordOffset {}
info is null", recordPartition, recordOffset);
+ LOGGER.warn("recordPartition {} info is null or recordOffset {}
info is null", recordPartition, recordOffset);
return;
}
String brokerName = (String)
recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String)
recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) ||
null == queueId) {
- logger.warn("brokerName is null or queueId is null or queueName is
null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
+ LOGGER.warn("brokerName is null or queueId is null or queueName is
null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
return;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName,
queueId);
Long offset = Long.valueOf((String)
recordOffset.getOffset().get(QUEUE_OFFSET));
if (null == offset) {
- logger.warn("resetOffset, offset is null");
+ LOGGER.warn("resetOffset, offset is null");
return;
}
messageQueuesOffsetMap.put(messageQueue, offset);
@@ -83,12 +83,12 @@ public class TriggerTaskContext implements SinkTaskContext {
@Override
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
if (MapUtils.isEmpty(offsets)) {
- logger.warn("resetOffset, offsets {} is null", offsets);
+ LOGGER.warn("resetOffset, offsets {} is null", offsets);
return;
}
for (Map.Entry<RecordPartition, RecordOffset> entry :
offsets.entrySet()) {
if (null == entry || null == entry.getKey() || null ==
entry.getKey().getPartition() || null == entry.getValue() || null ==
entry.getValue().getOffset()) {
- logger.warn("recordPartition {} info is null or recordOffset
{} info is null, entry {}", entry);
+ LOGGER.warn("recordPartition {} info is null or recordOffset
{} info is null, entry {}", entry);
continue;
}
RecordPartition recordPartition = entry.getKey();
@@ -96,14 +96,14 @@ public class TriggerTaskContext implements SinkTaskContext {
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String)
recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic)
|| null == queueId) {
- logger.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
+ LOGGER.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName,
queueId);
RecordOffset recordOffset = entry.getValue();
Long offset = Long.valueOf((String)
recordOffset.getOffset().get(QUEUE_OFFSET));
if (null == offset) {
- logger.warn("resetOffset, offset is null");
+ LOGGER.warn("resetOffset, offset is null");
continue;
}
messageQueuesOffsetMap.put(messageQueue, offset);
@@ -113,24 +113,24 @@ public class TriggerTaskContext implements
SinkTaskContext {
@Override
public void pause(List<RecordPartition> recordPartitions) {
if (recordPartitions == null || recordPartitions.size() == 0) {
- logger.warn("recordPartitions is null or recordPartitions.size()
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ LOGGER.warn("recordPartitions is null or recordPartitions.size()
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
return;
}
for (RecordPartition recordPartition : recordPartitions) {
if (null == recordPartition || null ==
recordPartition.getPartition()) {
- logger.warn("recordPartition {} info is null",
recordPartition);
+ LOGGER.warn("recordPartition {} info is null",
recordPartition);
continue;
}
String brokerName = (String)
recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String)
recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic)
|| null == queueId) {
- logger.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
+ LOGGER.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName,
queueId);
if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
- logger.warn("sink task current messageQueuesOffsetMap {} not
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ LOGGER.warn("sink task current messageQueuesOffsetMap {} not
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
continue;
}
messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
@@ -140,24 +140,24 @@ public class TriggerTaskContext implements
SinkTaskContext {
@Override
public void resume(List<RecordPartition> recordPartitions) {
if (recordPartitions == null || recordPartitions.size() == 0) {
- logger.warn("recordPartitions is null or recordPartitions.size()
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
+ LOGGER.warn("recordPartitions is null or recordPartitions.size()
is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
return;
}
for (RecordPartition recordPartition : recordPartitions) {
if (null == recordPartition || null ==
recordPartition.getPartition()) {
- logger.warn("recordPartition {} info is null",
recordPartition);
+ LOGGER.warn("recordPartition {} info is null",
recordPartition);
continue;
}
String brokerName = (String)
recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String)
recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic)
|| null == queueId) {
- logger.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
+ LOGGER.warn("brokerName is null or queueId is null or
queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId,
topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName,
queueId);
if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
- logger.warn("sink task current messageQueuesOffsetMap {} not
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
+ LOGGER.warn("sink task current messageQueuesOffsetMap {} not
contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
continue;
}
messageQueuesStateMap.remove(messageQueue);
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
index 39734c4..2174060 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/LoggerName.java
@@ -21,8 +21,8 @@ package
org.apache.rocketmq.eventbridge.adapter.runtime.common;
* Define all the logger name of the runtime.
*/
public class LoggerName {
- public static final String EventBridge_RUNTIMER = "EventBridgeRuntimer";
- public static final String EventBus_Listener = "EventBusListener";
- public static final String EventRule_Transfer = "EventRuleTransfer";
- public static final String EventTarget_Trigger = "EventTargetTrigger";
+ public static final String EVENT_BRIDGE_RUNTIMER = "EventBridgeRuntimer";
+ public static final String EVENT_BUS_LISTENER = "EventBusListener";
+ public static final String EVENT_RULE_TRANSFER = "EventRuleTransfer";
+ public static final String EVENT_TARGET_TRIGGER = "EventTargetTrigger";
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
index 400cf47..027925e 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/ServiceThread.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class ServiceThread extends AbstractStartAndShutdown
implements Runnable {
- private static final Logger logger =
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private static final long JOIN_TIME = 90 * 1000;
@@ -45,14 +45,14 @@ public abstract class ServiceThread extends
AbstractStartAndShutdown implements
public abstract String getServiceName();
public void start() {
- logger.info("Try to start service thread:{} started:{} lastThread:{}",
getServiceName(), hasNotified.get(), thread);
+ LOGGER.info("Try to start service thread:{} started:{} lastThread:{}",
getServiceName(), hasNotified.get(), thread);
if (!hasNotified.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread.setDaemon(isDaemon);
this.thread.start();
- logger.info("Start service thread:{} started:{} lastThread:{}",
getServiceName(), hasNotified.get(), thread);
+ LOGGER.info("Start service thread:{} started:{} lastThread:{}",
getServiceName(), hasNotified.get(), thread);
}
public void shutdown() {
@@ -61,7 +61,7 @@ public abstract class ServiceThread extends
AbstractStartAndShutdown implements
public void shutdown(final boolean interrupt) {
this.stopped = true;
- logger.info("shutdown thread " + this.getServiceName() + " interrupt "
+ interrupt);
+ LOGGER.info("shutdown thread " + this.getServiceName() + " interrupt "
+ interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -77,10 +77,10 @@ public abstract class ServiceThread extends
AbstractStartAndShutdown implements
this.thread.join(this.getJointime());
}
long eclipseTime = System.currentTimeMillis() - beginTime;
- logger.info("join thread " + this.getServiceName() + " eclipse
time(ms) " + eclipseTime + " "
+ LOGGER.info("join thread " + this.getServiceName() + " eclipse
time(ms) " + eclipseTime + " "
+ this.getJointime());
} catch (InterruptedException e) {
- logger.error("Interrupted", e);
+ LOGGER.error("Interrupted", e);
}
}
@@ -94,7 +94,7 @@ public abstract class ServiceThread extends
AbstractStartAndShutdown implements
public void stop(final boolean interrupt) {
this.stopped = true;
- logger.info("stop thread " + this.getServiceName() + " interrupt " +
interrupt);
+ LOGGER.info("stop thread " + this.getServiceName() + " interrupt " +
interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
@@ -107,7 +107,7 @@ public abstract class ServiceThread extends
AbstractStartAndShutdown implements
public void makeStop() {
this.stopped = true;
- logger.info("makestop thread " + this.getServiceName());
+ LOGGER.info("makestop thread " + this.getServiceName());
}
public void wakeup() {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
index aa696e8..58d8e2b 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetKeyValue.java
@@ -139,15 +139,17 @@ public class TargetKeyValue implements KeyValue,
Serializable {
this.properties = properties;
}
- public KeyValue putAll(Map<String,String> configProps){
+ public KeyValue putAll(Map<String, String> configProps) {
this.properties.putAll(configProps);
return this;
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
TargetKeyValue that = (TargetKeyValue) o;
return Objects.equals(targetKeyId, that.targetKeyId) &&
Objects.equals(properties, that.properties);
}
@@ -160,9 +162,9 @@ public class TargetKeyValue implements KeyValue,
Serializable {
@Override
public String toString() {
return "TargetKeyValue{" +
- "targetKeyId='" + targetKeyId + '\'' +
- ", properties=" + properties +
- '}';
+ "targetKeyId='" + targetKeyId + '\'' +
+ ", properties=" + properties +
+ '}';
}
public String getTargetKeyId() {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
index 6cbc2f7..d3ab201 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/entity/TargetRunnerConfig.java
@@ -71,19 +71,11 @@ public class TargetRunnerConfig implements Serializable {
private boolean isEqualsComponents(List<Map<String, String>> source,
List<Map<String, String>> target) {
if (source == null || target == null) {
- if (source != target) {
- return false;
- } else {
- return true;
- }
+ return source == target;
}
if (source.isEmpty() || target.isEmpty()) {
- if (source.isEmpty() && target.isEmpty()) {
- return true;
- } else {
- return false;
- }
+ return source.isEmpty() && target.isEmpty();
}
if (source.size() != target.size()) {
@@ -99,10 +91,8 @@ public class TargetRunnerConfig implements Serializable {
String element = targetComponent.get(entry.getKey());
if (element == null && entry.getValue() == null) {
return true;
- } else if (element.equals(entry.getValue())) {
- return true;
} else {
- return false;
+ return element.equals(entry.getValue());
}
}
}
@@ -117,7 +107,7 @@ public class TargetRunnerConfig implements Serializable {
return components.get(0).get(ACCOUNT_ID);
}
- public SubscribeRunnerKeys getSubscribeRunnerKeys(){
+ public SubscribeRunnerKeys getSubscribeRunnerKeys() {
SubscribeRunnerKeys subscribeRunnerKeys = new SubscribeRunnerKeys();
subscribeRunnerKeys.setRunnerName(this.getName());
subscribeRunnerKeys.setAccountId(this.getAccountId());
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
index 26475f2..cf14e66 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/Plugin.java
@@ -19,6 +19,7 @@ package
org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
+import java.util.ArrayList;
import org.apache.commons.lang3.StringUtils;
import org.reflections.Configuration;
import org.reflections.Reflections;
@@ -39,7 +40,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.Arrays;
@Component
public class Plugin extends URLClassLoader {
@@ -48,7 +54,7 @@ public class Plugin extends URLClassLoader {
@Value("${runtime.pluginpath:}")
private String pluginPath;
- private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
+ private final Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
public Plugin() {
super(new URL[0], Plugin.class.getClassLoader());
@@ -62,7 +68,7 @@ public class Plugin extends URLClassLoader {
}
}
- private List<String> initPluginPath(String plugin){
+ private List<String> initPluginPath(String plugin) {
List<String> pluginPaths = new ArrayList<>();
if (StringUtils.isNotEmpty(plugin)) {
String[] strArr = plugin.split(",");
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
index 2c11e1a..0cae63d 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/plugin/PluginUtils.java
@@ -16,6 +16,16 @@
*/
package org.apache.rocketmq.eventbridge.adapter.runtime.common.plugin;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +33,6 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.*;
import java.util.regex.Pattern;
public class PluginUtils {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
index e16d4ec..04aa07b 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/common/store/FileBaseKeyValueStore.java
@@ -37,7 +37,7 @@ import java.util.Map;
*/
public class FileBaseKeyValueStore<K, V> extends MemoryBasedKeyValueStore<K,
V> {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private String configFilePath;
private Converter keyConverter;
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
index ea5fd11..cf0cc79 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfigProps.java
@@ -29,25 +29,24 @@ import java.util.Properties;
*/
public class RuntimeConfigProps {
- private final static Logger logger =
LoggerFactory.getLogger(RuntimeConfigProps.class);
+ private final static Logger LOGGER =
LoggerFactory.getLogger(RuntimeConfigProps.class);
private Properties properties;
- private RuntimeConfigProps(){
+ private RuntimeConfigProps() {
try {
- properties =
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
+ properties =
PropertiesLoaderUtils.loadAllProperties("runtime.properties");
} catch (IOException exception) {
- logger.error("runtime load properties failed, stackTrace-",
exception);
+ LOGGER.error("runtime load properties failed, stackTrace-",
exception);
}
}
- private static class RuntimerConfigPropsHolder{
- private static final RuntimeConfigProps instance = new
RuntimeConfigProps();
+ private static class RuntimerConfigPropsHolder {
+ private static final RuntimeConfigProps INSTANCE = new
RuntimeConfigProps();
}
- public static RuntimeConfigProps build(){
- return RuntimerConfigPropsHolder.instance;
+ public static RuntimeConfigProps build() {
+ return RuntimerConfigPropsHolder.INSTANCE;
}
-
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
index 56fa123..00b3c4a 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/config/RuntimeConfiguration.java
@@ -26,7 +26,6 @@ import
org.apache.rocketmq.eventbridge.domain.repository.EventTargetRunnerReposi
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
@Configuration
public class RuntimeConfiguration {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
index dfad654..34800ba 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/JsonConverter.java
@@ -30,7 +30,7 @@ import java.io.UnsupportedEncodingException;
*/
public class JsonConverter implements Converter {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private Class clazz;
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
index b704c7b..2c31046 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/ListConverter.java
@@ -32,7 +32,7 @@ import java.util.List;
*/
public class ListConverter implements Converter<List> {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
private Class clazz;
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
index 4a6d9ef..5b3af12 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/converter/RecordOffsetConverter.java
@@ -31,7 +31,7 @@ import java.io.UnsupportedEncodingException;
*/
public class RecordOffsetConverter implements Converter<RecordOffset> {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.EventBridge_RUNTIMER);
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.EVENT_BRIDGE_RUNTIMER);
@Override
public byte[] objectToByte(RecordOffset recordOffset) {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
index b7cee55..a988981 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/error/ErrorHandler.java
@@ -91,7 +91,7 @@ public class ErrorHandler {
return -1;
}
int pow = (int) Math.pow(2, 3 + retryTimes);
- return (pow > 512 ? 512 : pow);
+ return pow > 512 ? 512 : pow;
default:
return -1;
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
index 0615e35..589238f 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/AbstractTargetRunnerConfigObserver.java
@@ -46,7 +46,7 @@ public abstract class AbstractTargetRunnerConfigObserver
implements TargetRunner
@Override
public Set<SubscribeRunnerKeys> getSubscribeRunnerKeys() {
- if(CollectionUtils.isEmpty(targetRunnerConfigs)){
+ if (CollectionUtils.isEmpty(targetRunnerConfigs)) {
return null;
}
return targetRunnerConfigs.stream().map(item -> {
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
index cbd01e0..85547f2 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java
@@ -38,7 +38,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.springframework.stereotype.Component;
@Slf4j
//@Component
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
index e6a9d1d..30bb31e 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ExceptionUtil.java
@@ -25,7 +25,7 @@ import java.io.StringWriter;
public class ExceptionUtil {
- private static final Logger logger =
LoggerFactory.getLogger(ExceptionUtil.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExceptionUtil.class);
public static String getErrorMessage(Throwable e) {
if (null == e) {
@@ -40,7 +40,7 @@ public class ExceptionUtil {
StringBuffer buffer = stringWriter.getBuffer();
return buffer.toString();
} catch (Throwable ex) {
- logger.error("", ex);
+ LOGGER.error("", ex);
}
return null;
}
diff --git
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
index cf8679d..2405a8f 100644
---
a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
+++
b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/ShutdownUtils.java
@@ -20,14 +20,12 @@ package
org.apache.rocketmq.eventbridge.adapter.runtime.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ShutdownUtils {
- private static final Logger logger =
LoggerFactory.getLogger(ShutdownUtils.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ShutdownUtils.class);
public static void shutdownThreadPool(ExecutorService executor) {
if (executor != null) {
@@ -35,7 +33,7 @@ public class ShutdownUtils {
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
- logger.error("Shutdown threadPool failed", e);
+ LOGGER.error("Shutdown threadPool failed", e);
}
if (!executor.isTerminated()) {
executor.shutdownNow();
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 4d32dad..58dda61 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -27,6 +27,13 @@ import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -57,7 +64,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +78,7 @@ import java.util.stream.Collectors;
@DependsOn("flyway")
public class RocketMQEventSubscriber extends EventSubscriber {
- private static final Logger logger =
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
@Autowired
private EventDataRepository eventDataRepository;
@@ -98,7 +104,7 @@ public class RocketMQEventSubscriber extends EventSubscriber
{
public static final String MSG_ID = "msgId";
@PostConstruct
- public void initRocketMQEventSubscriber(){
+ public void initRocketMQEventSubscriber() {
this.initMqProperties();
this.initConsumeWorkers();
}
@@ -123,18 +129,18 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
ArrayList<MessageExt> messages = new ArrayList<>();
messageBuffer.drainTo(messages, pullBatchSize);
if (CollectionUtils.isEmpty(messages)) {
- logger.trace("consumer poll message empty.");
+ LOGGER.trace("consumer poll message empty.");
return null;
}
List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures =
Lists.newArrayList();
- messages.forEach(item->{
- CompletableFuture<Void> recordCompletableFuture =
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
- .exceptionally((exception) -> {
- logger.error("execute completable job
failed,stackTrace-", exception);
- return null;
- })
- .thenAccept(connectRecords::add);
+ messages.forEach(item -> {
+ CompletableFuture<Void> recordCompletableFuture =
CompletableFuture.supplyAsync(() -> convertToSinkRecord(item))
+ .exceptionally((exception) -> {
+ LOGGER.error("execute completable job failed", exception);
+ return null;
+ })
+ .thenAccept(connectRecords::add);
completableFutures.add(recordCompletableFuture);
});
@@ -145,24 +151,25 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
/**
* group by runner name batch commit
+ *
* @param connectRecordList
*/
@Override
public void commit(List<ConnectRecord> connectRecordList) {
- if(CollectionUtils.isEmpty(connectRecordList)){
- logger.warn("commit event record data empty!");
+ if (CollectionUtils.isEmpty(connectRecordList)) {
+ LOGGER.warn("commit event record data empty!");
return;
}
String runnerName =
connectRecordList.iterator().next().getExtension(RuntimeConfigDefine.RUNNER_NAME);
List<String> msgIds = connectRecordList.stream().map(item ->
item.getPosition()
-
.getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
+
.getPartition().getPartition().get(MSG_ID).toString()).collect(Collectors.toList());
consumeWorkerMap.get(runnerName).commit(msgIds);
}
@Override
public void close() {
for (Map.Entry<String, ConsumeWorker> item :
consumeWorkerMap.entrySet()) {
- ConsumeWorker consumeWorker = item.getValue();
+ ConsumeWorker consumeWorker = item.getValue();
consumeWorker.shutdown();
}
}
@@ -187,7 +194,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
clientConfig.setNameSrvAddr(namesrvAddr);
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
- AccessChannel.CLOUD : AccessChannel.LOCAL);
+ AccessChannel.CLOUD : AccessChannel.LOCAL);
clientConfig.setNamespace(namespace);
this.clientConfig = clientConfig;
@@ -196,7 +203,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
}
if (StringUtils.isNotBlank(socks5UserName) &&
StringUtils.isNotBlank(socks5Password)
- && StringUtils.isNotBlank(socks5Endpoint)) {
+ && StringUtils.isNotBlank(socks5Endpoint)) {
SocksProxyConfig proxyConfig = new SocksProxyConfig();
proxyConfig.setUsername(socks5UserName);
proxyConfig.setPassword(socks5Password);
@@ -206,8 +213,8 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
this.socksProxy = new Gson().toJson(proxyConfigMap);
}
- }catch (Exception exception){
- logger.error("init rocket mq property exception, stack trace-",
exception);
+ } catch (Exception exception) {
+ LOGGER.error("init rocket mq property exception, stack trace-",
exception);
}
}
@@ -215,8 +222,8 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
* init rocket mq pull consumer
*/
private void initConsumeWorkers() {
- Set<SubscribeRunnerKeys> subscribeRunnerKeysSet =
runnerConfigObserver.getSubscribeRunnerKeys();
- if(subscribeRunnerKeysSet == null || subscribeRunnerKeysSet.isEmpty()){
+ Set<SubscribeRunnerKeys> subscribeRunnerKeysSet =
runnerConfigObserver.getSubscribeRunnerKeys();
+ if (subscribeRunnerKeysSet == null ||
subscribeRunnerKeysSet.isEmpty()) {
return;
}
for (SubscribeRunnerKeys subscribeRunnerKeys : subscribeRunnerKeysSet)
{
@@ -229,6 +236,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
/**
* first init default rocketmq pull consumer
+ *
* @return
*/
public LitePullConsumer initLitePullConsumer(SubscribeRunnerKeys
subscribeRunnerKeys) {
@@ -245,7 +253,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
pullConsumer.attachTopic(topic, "*");
pullConsumer.startup();
} catch (Exception exception) {
- logger.error("init default pull consumer exception, topic -" +
topic + "-stackTrace-", exception);
+ LOGGER.error("init default pull consumer exception, topic -" +
topic + "-stackTrace-", exception);
throw new EventBridgeException(" init rocketmq consumer failed");
}
return pullConsumer;
@@ -265,6 +273,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
/**
* MessageExt convert to connect record
+ *
* @param messageExt
* @return
*/
@@ -311,7 +320,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
private void putConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
ConsumeWorker consumeWorker =
consumeWorkerMap.get(subscribeRunnerKeys.getRunnerName());
- if (!Objects.isNull(consumeWorker)){
+ if (!Objects.isNull(consumeWorker)) {
consumeWorker.shutdown();
}
LitePullConsumer litePullConsumer =
initLitePullConsumer(subscribeRunnerKeys);
@@ -322,7 +331,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
private void removeConsumeWorker(SubscribeRunnerKeys subscribeRunnerKeys) {
ConsumeWorker consumeWorker =
consumeWorkerMap.remove(subscribeRunnerKeys.getRunnerName());
- if (!Objects.isNull(consumeWorker)){
+ if (!Objects.isNull(consumeWorker)) {
consumeWorker.shutdown();
}
}
@@ -352,12 +361,12 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
messageBuffer.put(message);
}
} catch (Exception exception) {
- logger.error(getServiceName() + " -
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
+ LOGGER.error(getServiceName() + " -
RocketMQEventSubscriber pull record exception, stackTrace - ", exception);
}
}
}
- public void commit(List<String> messageIds){
+ public void commit(List<String> messageIds) {
this.pullConsumer.commit(messageIds);
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
index b37b851..f42cf9b 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ClientConfig.java
@@ -20,10 +20,6 @@ package
org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer.consum
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:08 上午
- */
public class ClientConfig {
private int rmqPullMessageCacheCapacity = 1000;
private int rmqPullMessageBatchNums = 20;
@@ -59,7 +55,7 @@ public class ClientConfig {
}
public void setConsumeFromWhere(
- final ConsumeFromWhere consumeFromWhere) {
+ final ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
@@ -119,7 +115,6 @@ public class ClientConfig {
this.accessChannel = accessChannel;
}
-
public static ClientConfig cloneConfig(ClientConfig clientConfig) {
ClientConfig newConfig = new ClientConfig();
newConfig.setRmqPullMessageBatchNums(clientConfig.getRmqPullMessageBatchNums());
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
index 9923eab..1755490 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/ConsumeRequest.java
@@ -21,10 +21,6 @@ import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:07 上午
- */
public class ConsumeRequest {
private final MessageExt messageExt;
private final MessageQueue messageQueue;
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
index 667f17a..834b7c1 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumer.java
@@ -23,10 +23,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.time.Duration;
import java.util.List;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:09 上午
- */
public interface LitePullConsumer {
void startup() throws MQClientException;
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
index a76012e..888d36d 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LitePullConsumerImpl.java
@@ -48,12 +48,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:10 上午
- */
public class LitePullConsumerImpl implements LitePullConsumer {
- private static final Logger log =
LoggerFactory.getLogger(LitePullConsumerImpl.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LitePullConsumerImpl.class);
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
@@ -83,7 +79,7 @@ public class LitePullConsumerImpl implements LitePullConsumer
{
@Override
public void startup() throws MQClientException {
rocketmqPullConsumer.start();
- log.info("RocketmqPullConsumer start.");
+ LOGGER.info("RocketmqPullConsumer start.");
}
@Override
@@ -98,7 +94,7 @@ public class LitePullConsumerImpl implements LitePullConsumer
{
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
- log.error("Shutdown threadPool failed", e);
+ LOGGER.error("Shutdown threadPool failed", e);
}
if (!executor.isTerminated()) {
executor.shutdownNow();
@@ -114,7 +110,7 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
public void messageQueueChanged(String topic, Set<MessageQueue>
mqAll, Set<MessageQueue> mqDivided) {
submitPullTask(topic, tag, mqDivided);
localMessageCache.shrinkPullOffsetTable(mqDivided);
- log.info("Load balance result of topic {} changed, mqAll {},
mqDivided {}.", topic, mqAll, mqDivided);
+ LOGGER.info("Load balance result of topic {} changed, mqAll
{}, mqDivided {}.", topic, mqAll, mqDivided);
}
});
}
@@ -156,7 +152,7 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
}
}
if (CollectionUtils.isEmpty(assignedQueues)) {
- log.warn("Not found any messageQueue, topic:{}", topic);
+ LOGGER.warn("Not found any messageQueue, topic:{}", topic);
return;
}
@@ -167,10 +163,10 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
try {
PullTask pullTask = new PullTask(messageQueue, tag);
pullImmediately(pullTask);
- log.info("Submit pullTask:{}", messageQueue);
+ LOGGER.info("Submit pullTask:{}", messageQueue);
} catch (Exception e) {
- log.error("Failed submit pullTask:{}, {}, wait next
balancing", topic, messageQueue, e);
- // 添加pull失败,等待下次 rebalance
+ LOGGER.error("Failed submit pullTask:{}, {}, wait next
balancing", topic, messageQueue, e);
+ // Failed to add pull task, waiting for the next round of
re-balance
processQueue =
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().remove(messageQueue);
if (processQueue != null) {
@@ -215,13 +211,13 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
public void run() {
try {
if
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
{
- log.warn("RocketmqPullConsumer not running, pullTask
exit.");
+ LOGGER.warn("RocketmqPullConsumer not running, pullTask
exit.");
return;
}
ProcessQueue processQueue =
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(messageQueue);
if (processQueue == null || processQueue.isDropped()) {
- log.info("ProcessQueue {} dropped, pullTask exit",
messageQueue);
+ LOGGER.info("ProcessQueue {} dropped, pullTask exit",
messageQueue);
return;
}
long offset = localMessageCache.nextPullOffset(messageQueue);
@@ -231,7 +227,7 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
public void onSuccess(PullResult pullResult) {
try {
if
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
{
- log.warn("rocketmqPullConsumer not running,
pullTask exit.");
+ LOGGER.warn("rocketmqPullConsumer not running,
pullTask exit.");
return;
}
@@ -248,11 +244,11 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
pullImmediately(PullTask.this);
} else {
localMessageCache.removePullOffset(messageQueue);
- log.info("ProcessQueue {} dropped,
discard the pulled message.", messageQueue);
+ LOGGER.info("ProcessQueue {} dropped,
discard the pulled message.", messageQueue);
}
break;
case OFFSET_ILLEGAL:
- log.warn("The pull request offset is
illegal, offset is {}, message queue is {}, " +
+ LOGGER.warn("The pull request offset is
illegal, offset is {}, message queue is {}, " +
"pull result is {}, delay
{} ms for next pull",
offset, messageQueue, pullResult,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
localMessageCache.updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
@@ -260,16 +256,16 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
- log.info("No NEW_MSG or MATCHED_MSG for
mq:{}, pull again.", messageQueue);
+ LOGGER.info("No NEW_MSG or MATCHED_MSG for
mq:{}, pull again.", messageQueue);
localMessageCache.updatePullOffset(messageQueue,
pullResult.getNextBeginOffset());
pullImmediately(PullTask.this);
break;
default:
- log.warn("Failed to process pullResult,
mq:{} {}", messageQueue, pullResult);
+ LOGGER.warn("Failed to process pullResult,
mq:{} {}", messageQueue, pullResult);
break;
}
} catch (Throwable t) {
- log.error("Exception occurs when process
pullResult", t);
+ LOGGER.error("Exception occurs when process
pullResult", t);
pullLater(PullTask.this,
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
}
}
@@ -282,13 +278,13 @@ public class LitePullConsumerImpl implements
LitePullConsumer {
} else {
delayTimeMillis =
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
}
- log.error("Exception happens when pull message
process, delay {} ms for message queue {}",
+ LOGGER.error("Exception happens when pull message
process, delay {} ms for message queue {}",
delayTimeMillis, messageQueue, e);
pullLater(PullTask.this, delayTimeMillis,
TimeUnit.MILLISECONDS);
}
});
} catch (Throwable t) {
- log.error("Error occurs when pull message process, delay {} ms
for message queue {}",
+ LOGGER.error("Error occurs when pull message process, delay {}
ms for message queue {}",
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, messageQueue, t);
pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION,
TimeUnit.MILLISECONDS);
}
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
index f3e3617..131e6fc 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/consumer/LocalMessageCache.java
@@ -37,10 +37,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-/**
- * @Author changfeng
- * @Date 2023/4/9 10:06 上午
- */
public class LocalMessageCache {
private static final Logger log =
LoggerFactory.getLogger(LocalMessageCache.class);
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
index 61000fd..aae17c2 100644
---
a/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/enums/PushRetryStrategyEnum.java
@@ -24,8 +24,9 @@ public enum PushRetryStrategyEnum {
* 3 times: every 10s~20s
*/
BACKOFF_RETRY(1, 3),
+
/**
- * 176 times: 1,2,4,8,16,32,64,128,256,512,512...512秒 ... 512s(176)
+ * 176 times: 1, 2, 4, 8, 16, 36, 64, 128, 256, 512, 512...512s(176)
*/
EXPONENTIAL_DECAY_RETRY(2, 176);
diff --git
a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
index 739ecbe..e9e578d 100644
---
a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
+++
b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/trace/TraceStrategy.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.eventbridge.infrastructure.trace;
/**
- * EventMeshTraceService
- * SPI可扩展
- * 基于OpenTelemetry实现封装不同追踪器
+ * Offers extension capability via SPI, allowing different tracing/metrics
observation implementations: OpenTelemetry,
+ * Jaeger, Zipkin, etc.
*/
public interface TraceStrategy {
diff --git
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
index e6f81c2..0ed31ba 100644
---
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
+++
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/ValidateFilter.java
@@ -43,27 +43,27 @@ public class ValidateFilter implements WebFilter {
private List<AuthValidation> validations = new CopyOnWriteArrayList<>();
- @Value(value="${auth.validation:default}")
+ @Value(value = "${auth.validation:default}")
private String validationName;
@PostConstruct
public void init() {
List<String> validationNames =
Arrays.stream(validationName.split(",")).collect(Collectors.toList());
- boolean match =
Arrays.stream(validationName.split(",")).allMatch(validationName->
validationName.equals("default"));
+ boolean match =
Arrays.stream(validationName.split(",")).allMatch(validationName ->
validationName.equals("default"));
if (!match) {
validationNames.add(0, "default");
}
-
validationNames.forEach(action->validations.add(ValidationServiceFactory.getInstance(action)));
+ validationNames.forEach(action ->
validations.add(ValidationServiceFactory.getInstance(action)));
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain)
{
ServerHttpRequest request = exchange.getRequest();
return chain.filter(exchange)
- .subscriberContext(ctx -> {
- AtomicReference<Context> result = new
AtomicReference<Context>();
- validations.forEach(validation->
result.set(validation.validate(request, ctx)));
- return result.get();
- });
+ .subscriberContext(ctx -> {
+ AtomicReference<Context> result = new
AtomicReference<Context>();
+ validations.forEach(validation ->
result.set(validation.validate(request, ctx)));
+ return result.get();
+ });
}
}