This is an automated email from the ASF dual-hosted git repository.
xiatian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 50a36aadf [ISSUE #4788] Support disruptor as memory queue (#4844)
50a36aadf is described below
commit 50a36aadf3a13b218743bf0d4a8707540f7c051d
Author: Jevin Jiang <[email protected]>
AuthorDate: Mon Jul 15 16:06:09 2024 +0800
[ISSUE #4788] Support disruptor as memory queue (#4844)
* [ISSUE #4788] Support disruptor as memory queue
* [ISSUE #4788] fix code style
---------
Co-authored-by: JiangShuJu <[email protected]>
---
.../eventmesh-storage-standalone/build.gradle | 1 +
.../storage/standalone/admin/StandaloneAdmin.java | 26 +----
.../storage/standalone/broker/Channel.java | 105 +++++++++++++++++++
.../standalone/broker/StandaloneBroker.java | 113 ++++++++++-----------
.../standalone/broker/model/MessageEntity.java | 8 ++
.../broker/provider/DisruptorProvider.java | 109 ++++++++++++++++++++
.../storage/standalone/broker/task/Subscribe.java | 87 ++++++++--------
.../standalone/broker/task/SubscribeTask.java | 51 ----------
.../standalone/consumer/StandaloneConsumer.java | 16 +--
.../eventmesh/storage/standalone/TestUtils.java | 31 +++++-
.../standalone/admin/StandaloneAdminTest.java | 39 +++----
.../standalone/broker/StandaloneBrokerTest.java | 51 +++-------
.../standalone/broker/task/SubscribeTest.java | 10 +-
.../producer/StandaloneProducerTest.java | 8 ++
14 files changed, 385 insertions(+), 270 deletions(-)
diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
index a8b1827aa..22271fb57 100644
--- a/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
+++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/build.gradle
@@ -18,6 +18,7 @@
dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
+ implementation "com.lmax:disruptor"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
index 7f5ab2da6..72257647a 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdmin.java
@@ -19,7 +19,7 @@ package org.apache.eventmesh.storage.standalone.admin;
import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
-import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
+import org.apache.eventmesh.storage.standalone.broker.Channel;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
@@ -42,11 +42,11 @@ public class StandaloneAdmin extends AbstractAdmin {
@Override
public List<TopicProperties> getTopic() throws Exception {
- ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer =
this.standaloneBroker.getMessageContainer();
+ ConcurrentHashMap<TopicMetadata, Channel> messageContainer =
this.standaloneBroker.getMessageContainer();
List<TopicProperties> topicList = new ArrayList<>();
messageContainer.keySet().forEach(topicMetadata -> {
- MessageQueue messageQueue = messageContainer.get(topicMetadata);
- final int messageCount = messageQueue.getPutIndex() -
messageQueue.getTakeIndex();
+ Channel channel = messageContainer.get(topicMetadata);
+ final int messageCount = channel.getMessageCount();
topicList.add(new TopicProperties(
topicMetadata.getTopicName(),
messageCount));
@@ -65,25 +65,7 @@ public class StandaloneAdmin extends AbstractAdmin {
standaloneBroker.deleteTopicIfExist(topicName);
}
- @Override
- public List<CloudEvent> getEvent(String topicName, int offset, int length)
throws Exception {
- if (!this.standaloneBroker.checkTopicExist(topicName)) {
- throw new Exception("The topic name doesn't exist in the message
queue");
- }
- ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer =
this.standaloneBroker.getMessageContainer();
- long topicOffset = messageContainer.get(new
TopicMetadata(topicName)).getTakeIndex();
- List<CloudEvent> messageList = new ArrayList<>();
- for (int index = 0; index < length; index++) {
- long messageOffset = topicOffset + offset + index;
- CloudEvent event = this.standaloneBroker.getMessage(topicName,
messageOffset);
- if (event == null) {
- break;
- }
- messageList.add(event);
- }
- return messageList;
- }
@Override
public void publish(CloudEvent cloudEvent) throws Exception {
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
new file mode 100644
index 000000000..2ea7310b8
--- /dev/null
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.storage.standalone.broker;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import
org.apache.eventmesh.storage.standalone.broker.provider.DisruptorProvider;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.IgnoreExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+import lombok.Getter;
+
+
+public class Channel implements LifeCycle {
+
+ public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
+ @Getter
+ private DisruptorProvider provider;
+ private final Integer size;
+ private final EventHandler<MessageEntity> eventHandler;
+ private volatile boolean started = false;
+ private final TopicMetadata topic;
+ private static final String THREAD_NAME_PREFIX =
"standalone_disruptor_provider_";
+
+ public Channel(TopicMetadata topic, EventHandler<MessageEntity>
eventHandler) {
+ this(DEFAULT_SIZE, topic, eventHandler);
+ }
+
+
+ public Channel(final Integer ringBufferSize, final TopicMetadata topic,
final EventHandler<MessageEntity> eventHandler) {
+ this.size = ringBufferSize;
+ this.topic = topic;
+ this.eventHandler = eventHandler;
+ }
+
+
+ @Override
+ public boolean isStarted() {
+ return started;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !isStarted();
+ }
+
+ public synchronized void start() {
+ if (isClosed()) {
+ doStart();
+ started = true;
+ }
+ }
+
+ public void doStart() {
+ Disruptor<MessageEntity> disruptor = new Disruptor<>(
+ MessageEntity::new,
+ size,
+ new EventMeshThreadFactory(THREAD_NAME_PREFIX +
topic.getTopicName(), true),
+ ProducerType.MULTI,
+ new BlockingWaitStrategy()
+ );
+
+ disruptor.handleEventsWith(eventHandler);
+ disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
+ RingBuffer<MessageEntity> ringBuffer = disruptor.getRingBuffer();
+ provider = new DisruptorProvider(ringBuffer, disruptor);
+ provider.start();
+ }
+
+ public int getMessageCount() {
+ return provider.getMessageCount();
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (isStarted()) {
+ provider.shutdown();
+ provider = null;
+ started = false;
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
index 5e64b40a7..8654b2d1c 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
@@ -19,42 +19,36 @@ package org.apache.eventmesh.storage.standalone.broker;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
-import org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClear;
-import
org.apache.eventmesh.storage.standalone.broker.task.HistoryMessageClearTask;
-
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import io.cloudevents.CloudEvent;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
/**
* This broker used to store event, it just support standalone mode, you
shouldn't use this module in production environment
*/
+@Slf4j
public class StandaloneBroker {
- private final ConcurrentHashMap<TopicMetadata, MessageQueue>
messageContainer;
+ // message source by topic
+ @Getter
+ private final ConcurrentHashMap<TopicMetadata, Channel> messageContainer;
- // todo: move the offset manage to consumer
- private final ConcurrentHashMap<TopicMetadata, AtomicLong> offsetMap;
+ @Getter
+ private final ConcurrentHashMap<TopicMetadata, Subscribe>
subscribeContainer;
private StandaloneBroker() {
this.messageContainer = new ConcurrentHashMap<>();
- this.offsetMap = new ConcurrentHashMap<>();
- startHistoryMessageCleanTask();
- }
-
- public ConcurrentHashMap<TopicMetadata, MessageQueue>
getMessageContainer() {
- return this.messageContainer;
+ this.subscribeContainer = new ConcurrentHashMap<>();
}
- public ConcurrentHashMap<TopicMetadata, AtomicLong> getOffsetMap() {
- return this.offsetMap;
- }
public static StandaloneBroker getInstance() {
- return StandaloneBrokerInstanceHolder.instance;
+ return StandaloneBrokerInstanceHolder.INSTANCE;
}
/**
@@ -62,28 +56,38 @@ public class StandaloneBroker {
*
* @param topicName topic name
* @param message message
- * @throws InterruptedException
*/
- public MessageEntity putMessage(String topicName, CloudEvent message)
throws InterruptedException {
- Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
- AtomicLong topicOffset = pair.getRight();
- MessageQueue messageQueue = pair.getLeft();
-
- MessageEntity messageEntity = new MessageEntity(
- new TopicMetadata(topicName), message,
topicOffset.getAndIncrement(), System.currentTimeMillis());
- messageQueue.put(messageEntity);
-
+ public MessageEntity putMessage(String topicName, CloudEvent message) {
+ TopicMetadata topicMetadata = new TopicMetadata(topicName);
+ if (!messageContainer.containsKey(topicMetadata)) {
+ createTopic(topicName);
+ }
+ Channel channel = messageContainer.get(topicMetadata);
+ MessageEntity messageEntity = new MessageEntity(new
TopicMetadata(topicName), message);
+ channel.getProvider().onData(messageEntity);
return messageEntity;
}
+ public Channel createTopic(String topicName) {
+ TopicMetadata topicMetadata = new TopicMetadata(topicName);
+ return messageContainer.computeIfAbsent(topicMetadata, k -> {
+ Subscribe subscribe = subscribeContainer.get(topicMetadata);
+ if (subscribe == null) {
+ throw new IllegalStateException("the topic not exist subscribe
");
+ }
+ Channel channel = new Channel(topicMetadata, subscribe);
+ channel.start();
+ return channel;
+ });
+ }
+
/**
* Get the message, if the queue is empty then await
*
* @param topicName
*/
public CloudEvent takeMessage(String topicName) throws
InterruptedException {
- TopicMetadata topicMetadata = new TopicMetadata(topicName);
- return messageContainer.computeIfAbsent(topicMetadata, k -> new
MessageQueue()).take().getMessage();
+ return null;
}
/**
@@ -92,12 +96,7 @@ public class StandaloneBroker {
* @param topicName
*/
public CloudEvent getMessage(String topicName) {
- TopicMetadata topicMetadata = new TopicMetadata(topicName);
- MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k
-> new MessageQueue()).getHead();
- if (head == null) {
- return null;
- }
- return head.getMessage();
+ return null;
}
/**
@@ -108,21 +107,9 @@ public class StandaloneBroker {
* @return CloudEvent
*/
public CloudEvent getMessage(String topicName, long offset) {
- TopicMetadata topicMetadata = new TopicMetadata(topicName);
- MessageEntity messageEntity =
messageContainer.computeIfAbsent(topicMetadata, k -> new
MessageQueue()).getByOffset(offset);
- if (messageEntity == null) {
- return null;
- }
- return messageEntity.getMessage();
+ return null;
}
- private void startHistoryMessageCleanTask() {
- HistoryMessageClear historyMessageClear = new
HistoryMessageClear(messageContainer);
- Thread thread = new Thread(new
HistoryMessageClearTask(historyMessageClear));
- thread.setDaemon(true);
- thread.setName("StandaloneBroker-HistoryMessageCleanTask");
- thread.start();
- }
public boolean checkTopicExist(String topicName) {
return messageContainer.containsKey(new TopicMetadata(topicName));
@@ -132,13 +119,10 @@ public class StandaloneBroker {
* if the topic does not exist, create the topic
*
* @param topicName topicName
- * @return messageQueue and offset
+ * @return Channel
*/
- public Pair<MessageQueue, AtomicLong> createTopicIfAbsent(String
topicName) {
- TopicMetadata topicMetadata = new TopicMetadata(topicName);
- MessageQueue messageQueue =
messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue());
- AtomicLong offset = offsetMap.computeIfAbsent(topicMetadata, k -> new
AtomicLong());
- return Pair.of(messageQueue, offset);
+ public Channel createTopicIfAbsent(String topicName) {
+ return createTopic(topicName);
}
/**
@@ -148,18 +132,23 @@ public class StandaloneBroker {
*/
public void deleteTopicIfExist(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
+ Channel channel = createTopicIfAbsent(topicName);
+ channel.shutdown();
messageContainer.remove(topicMetadata);
}
- public void updateOffset(TopicMetadata topicMetadata, long offset) {
- offsetMap.computeIfPresent(topicMetadata, (k, v) -> {
- v.set(offset);
- return v;
- });
+ public void subscribed(String topicName, Subscribe subscribe) {
+ TopicMetadata topicMetadata = new TopicMetadata(topicName);
+ if (getMessageContainer().containsKey(topicMetadata)) {
+ log.warn("the topic already subscribed");
+ return;
+ }
+ subscribeContainer.put(topicMetadata, subscribe);
}
+
private static class StandaloneBrokerInstanceHolder {
- private static final StandaloneBroker instance = new
StandaloneBroker();
+ private static final StandaloneBroker INSTANCE = new
StandaloneBroker();
}
-}
+}
\ No newline at end of file
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
index 0f437aee0..3662b3025 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/model/MessageEntity.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
import io.cloudevents.CloudEvent;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
public class MessageEntity implements Serializable {
private static final long serialVersionUID = 6646148767540524786L;
@@ -40,6 +43,11 @@ public class MessageEntity implements Serializable {
this.createTimeMills = currentTimeMills;
}
+ public MessageEntity(TopicMetadata topicMetadata, CloudEvent message) {
+ this.topicMetadata = topicMetadata;
+ this.message = message;
+ }
+
public TopicMetadata getTopicMetadata() {
return topicMetadata;
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
new file mode 100644
index 000000000..47b2665a2
--- /dev/null
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/provider/DisruptorProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.storage.standalone.broker.provider;
+
+import org.apache.eventmesh.api.LifeCycle;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+
+import com.lmax.disruptor.EventTranslatorOneArg;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * DisruptorProvider. disruptor provider definition.
+ */
+@Slf4j
+public class DisruptorProvider implements LifeCycle {
+
+ private final RingBuffer<MessageEntity> ringBuffer;
+
+ private final Disruptor<MessageEntity> disruptor;
+
+ private volatile boolean start = false;
+
+ private final EventTranslatorOneArg<MessageEntity, MessageEntity>
translatorOneArg = (messageEntity, sequence, arg0) -> {
+ arg0.setOffset(sequence);
+ arg0.setCreateTimeMills(System.currentTimeMillis());
+ messageEntity.setOffset(arg0.getOffset());
+ messageEntity.setCreateTimeMills(arg0.getCreateTimeMills());
+ messageEntity.setTopicMetadata(arg0.getTopicMetadata());
+ messageEntity.setMessage(arg0.getMessage());
+ };
+
+
+ /**
+ * Instantiates a new Disruptor provider.
+ *
+ * @param ringBuffer the ring buffer
+ * @param disruptor the disruptor
+ */
+ public DisruptorProvider(final RingBuffer<MessageEntity> ringBuffer, final
Disruptor<MessageEntity> disruptor) {
+ this.ringBuffer = ringBuffer;
+ this.disruptor = disruptor;
+ }
+
+ /**
+ * @param data the data
+ */
+ public MessageEntity onData(final MessageEntity data) {
+ if (isClosed()) {
+ throw new IllegalArgumentException("the disruptor is close");
+ }
+ try {
+ ringBuffer.publishEvent(translatorOneArg, data);
+ } catch (Exception ex) {
+ throw new IllegalStateException("send data fail.");
+ }
+ return data;
+ }
+
+
+ @Override
+ public boolean isStarted() {
+ return start;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !isStarted();
+ }
+
+ @Override
+ public void start() {
+ if (null != disruptor) {
+ disruptor.start();
+ start = true;
+ }
+ }
+
+ /**
+ * Shutdown.
+ */
+ public void shutdown() {
+ if (null != disruptor) {
+ disruptor.shutdown();
+ start = false;
+ }
+ }
+
+ public int getMessageCount() {
+ return ringBuffer.getBufferSize();
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
index 8316270ad..4c84849ac 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/Subscribe.java
@@ -21,16 +21,19 @@ import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
-import java.util.concurrent.atomic.AtomicInteger;
import io.cloudevents.CloudEvent;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.WorkHandler;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class Subscribe {
+public class Subscribe implements WorkHandler<MessageEntity>,
EventHandler<MessageEntity> {
@Getter
private final String topicName;
@@ -38,8 +41,6 @@ public class Subscribe {
private final EventListener listener;
@Getter
private volatile boolean isRunning;
- @Getter
- private AtomicInteger offset;
public Subscribe(String topicName,
StandaloneBroker standaloneBroker,
@@ -51,52 +52,50 @@ public class Subscribe {
}
public void subscribe() {
+ standaloneBroker.subscribed(topicName, this);
+ }
+
+ public void shutdown() {
+ isRunning = false;
+ standaloneBroker.deleteTopicIfExist(topicName);
+ }
+
+ @Override
+ public void onEvent(MessageEntity event, long sequence, boolean
endOfBatch) {
+ onEvent(event);
+ }
+
+ @Override
+ public void onEvent(MessageEntity event) {
try {
- log.debug("execute subscribe task, topic: {}, offset: {}",
topicName, offset);
- if (offset == null) {
- CloudEvent message = standaloneBroker.getMessage(topicName);
- if (message != null) {
- Object tmpOffset = message.getExtension("offset");
- if (tmpOffset instanceof Integer) {
- offset = new
AtomicInteger(Integer.parseInt(tmpOffset.toString()));
- } else {
- offset = new AtomicInteger(0);
- }
- }
+ if (!isRunning) {
+ return;
}
- if (offset != null) {
- CloudEvent message = standaloneBroker.getMessage(topicName,
offset.get());
- if (message != null) {
- EventMeshAsyncConsumeContext consumeContext = new
EventMeshAsyncConsumeContext() {
+ CloudEvent message = event.getMessage();
+ if (message != null) {
+ EventMeshAsyncConsumeContext consumeContext = new
EventMeshAsyncConsumeContext() {
- @Override
- public void commit(EventMeshAction action) {
- switch (action) {
- case CommitMessage:
- // update offset
- log.info("message commit, topic: {},
current offset:{}", topicName, offset.get());
- break;
- case ManualAck:
- // update offset
- offset.incrementAndGet();
- log.info("message ack, topic: {}, current
offset:{}", topicName, offset.get());
- break;
- case ReconsumeLater:
- default:
-
- }
+ @Override
+ public void commit(EventMeshAction action) {
+ switch (action) {
+ case CommitMessage:
+ // update offset
+ log.info("message commit, topic: {}, current
offset:{}", topicName, event.getOffset());
+ break;
+ case ManualAck:
+ // update offset
+ log.info("message ack, topic: {}, current
offset:{}", topicName, event.getOffset());
+ break;
+ case ReconsumeLater:
+ default:
}
- };
- listener.consume(message, consumeContext);
- }
+ }
+ };
+ listener.consume(message, consumeContext);
}
} catch (Exception ex) {
- log.error("consumer error, topic: {}, offset: {}", topicName,
offset == null ? null : offset.get(), ex);
+ log.error("consumer error, topic: {}, offset: {}", topicName,
event.getOffset(), ex);
}
}
- public void shutdown() {
- isRunning = false;
- }
-
-}
+}
\ No newline at end of file
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
deleted file mode 100644
index 0936c7925..000000000
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.storage.standalone.broker.task;
-
-import org.apache.eventmesh.common.utils.ThreadUtils;
-
-import java.util.concurrent.TimeUnit;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class SubscribeTask implements Runnable {
-
- private Subscribe subscribe;
-
- public SubscribeTask(Subscribe subscribe) {
- this.subscribe = subscribe;
- }
-
- @Override
- public void run() {
- while (subscribe.isRunning()) {
- subscribe.subscribe();
- try {
- ThreadUtils.sleepWithThrowException(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.error("Thread is interrupted, topic: {}, offset: {} thread
name: {}",
- subscribe.getTopicName(),
- subscribe.getOffset() == null ? null :
subscribe.getOffset().get(),
- Thread.currentThread().getName(), e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
-}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
index 9eb753e3f..edb66703f 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/consumer/StandaloneConsumer.java
@@ -20,17 +20,12 @@ package org.apache.eventmesh.storage.standalone.consumer;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.consumer.Consumer;
-import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
-import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
-import org.apache.eventmesh.storage.standalone.broker.task.SubscribeTask;
import java.util.List;
-import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import io.cloudevents.CloudEvent;
@@ -45,16 +40,10 @@ public class StandaloneConsumer implements Consumer {
private final ConcurrentHashMap<String, Subscribe> subscribeTable;
- private final ExecutorService consumeExecutorService;
-
public StandaloneConsumer(Properties properties) {
this.standaloneBroker = StandaloneBroker.getInstance();
this.subscribeTable = new ConcurrentHashMap<>(16);
this.isStarted = new AtomicBoolean(false);
- this.consumeExecutorService =
ThreadPoolFactory.createThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors() * 2,
- Runtime.getRuntime().availableProcessors() * 2,
- "StandaloneConsumerThread");
}
@Override
@@ -86,8 +75,6 @@ public class StandaloneConsumer implements Consumer {
@Override
public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext
context) {
- cloudEvents.forEach(cloudEvent -> standaloneBroker.updateOffset(
- new TopicMetadata(cloudEvent.getSubject()),
Objects.requireNonNull((Long) cloudEvent.getExtension("offset"))));
}
@@ -99,9 +86,8 @@ public class StandaloneConsumer implements Consumer {
synchronized (subscribeTable) {
standaloneBroker.createTopicIfAbsent(topic);
Subscribe subscribe = new Subscribe(topic, standaloneBroker,
listener);
- SubscribeTask subScribeTask = new SubscribeTask(subscribe);
+ subscribe.subscribe();
subscribeTable.put(topic, subscribe);
- consumeExecutorService.execute(subScribeTask);
}
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
index 5ea0ab6f1..0c16aabb3 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
@@ -17,9 +17,14 @@
package org.apache.eventmesh.storage.standalone;
+import org.apache.eventmesh.storage.standalone.broker.Channel;
import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
+import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
+
+import org.apache.commons.lang3.tuple.Pair;
import java.net.URI;
import java.util.Collections;
@@ -29,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
+
public class TestUtils {
public static final String TEST_TOPIC = "test-topic";
@@ -36,12 +42,18 @@ public class TestUtils {
public static final int LENGTH = 5;
public static final int EXCEEDED_MESSAGE_STORE_WINDOW = 60 * 60 * 1000 +
1000;
- public static ConcurrentHashMap<TopicMetadata, MessageQueue>
createDefaultMessageContainer() {
- ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = new
ConcurrentHashMap<>(1);
- messageContainer.put(new TopicMetadata(TEST_TOPIC), new
MessageQueue());
- return messageContainer;
+ public static Pair<ConcurrentHashMap<TopicMetadata, Channel>,
ConcurrentHashMap<TopicMetadata, Subscribe>> createDefaultMessageContainer(
+ StandaloneBroker broker) {
+ ConcurrentHashMap<TopicMetadata, Channel> messageContainer = new
ConcurrentHashMap<>(1);
+ ConcurrentHashMap<TopicMetadata, Subscribe> subscribeContainer = new
ConcurrentHashMap<>(1);
+
+ Subscribe subscribe = createSubscribe(broker);
+ subscribe.subscribe();
+ subscribeContainer.put(new TopicMetadata(TEST_TOPIC), subscribe);
+ return Pair.of(messageContainer, subscribeContainer);
}
+
public static ConcurrentHashMap<TopicMetadata, MessageQueue>
createMessageContainer(TopicMetadata topicMetadata, MessageEntity messageEntity)
throws InterruptedException {
ConcurrentHashMap<TopicMetadata, MessageQueue> messageContainer = new
ConcurrentHashMap<>(1);
@@ -79,4 +91,15 @@ public class TestUtils {
offSet,
currentTimeMillis);
}
+
+ public static Subscribe createSubscribe(StandaloneBroker standaloneBroker)
{
+ return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent,
context) -> {
+ });
+ }
+
+ public static Subscribe createSubscribe(StandaloneBroker standaloneBroker,
List<CloudEvent> cloudEvents) {
+ return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent,
context) -> {
+ cloudEvents.add(cloudEvent);
+ });
+ }
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
index 2d84df265..7200f902e 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/admin/StandaloneAdminTest.java
@@ -17,18 +17,20 @@
package org.apache.eventmesh.storage.standalone.admin;
-import static org.apache.eventmesh.storage.standalone.TestUtils.LENGTH;
-import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET;
import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
import static
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
import static
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageContainer;
import static
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultMessageEntity;
-import org.apache.eventmesh.api.admin.TopicProperties;
+import org.apache.eventmesh.storage.standalone.broker.Channel;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
+import org.apache.eventmesh.storage.standalone.broker.model.TopicMetadata;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
-import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.concurrent.ConcurrentHashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -52,6 +54,7 @@ public class StandaloneAdminTest {
private StandaloneAdmin standaloneAdmin;
+
@BeforeEach
public void setUp() {
initStaticInstance();
@@ -69,13 +72,6 @@ public class StandaloneAdminTest {
Assertions.assertTrue(standaloneAdmin.isClosed());
}
- @Test
- public void testGetTopic() throws Exception {
- List<TopicProperties> topicPropertiesList = standaloneAdmin.getTopic();
- Assertions.assertNotNull(topicPropertiesList);
- Assertions.assertFalse(topicPropertiesList.isEmpty());
- }
-
@Test
public void testCreateTopic() {
standaloneAdmin.createTopic(TEST_TOPIC);
@@ -88,21 +84,6 @@ public class StandaloneAdminTest {
Mockito.verify(standaloneBroker).deleteTopicIfExist(TEST_TOPIC);
}
- @Test
- public void testGetEvent() throws Exception {
-
Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.TRUE);
- Mockito.when(standaloneBroker.getMessage(TEST_TOPIC,
OFF_SET)).thenReturn(createDefaultCloudEvent());
- List<CloudEvent> events = standaloneAdmin.getEvent(TEST_TOPIC,
OFF_SET, LENGTH);
- Assertions.assertNotNull(events);
- Assertions.assertFalse(events.isEmpty());
- }
-
- @Test
- public void testGetEvent_throwException() {
-
Mockito.when(standaloneBroker.checkTopicExist(TEST_TOPIC)).thenReturn(Boolean.FALSE);
- Exception exception = Assertions.assertThrows(Exception.class, () ->
standaloneAdmin.getEvent(TEST_TOPIC, OFF_SET, LENGTH));
- Assertions.assertEquals("The topic name doesn't exist in the message
queue", exception.getMessage());
- }
@Test
public void testPublish() throws Exception {
@@ -116,7 +97,11 @@ public class StandaloneAdminTest {
private void initStaticInstance() {
try (MockedStatic<StandaloneBroker> standaloneBrokerMockedStatic =
Mockito.mockStatic(StandaloneBroker.class)) {
standaloneBrokerMockedStatic.when(StandaloneBroker::getInstance).thenReturn(standaloneBroker);
-
Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(createDefaultMessageContainer());
+ Pair<ConcurrentHashMap<TopicMetadata, Channel>,
ConcurrentHashMap<TopicMetadata, Subscribe>> pair =
+ createDefaultMessageContainer(standaloneBroker);
+
Mockito.when(standaloneBroker.getSubscribeContainer()).thenReturn(pair.getRight());
+
Mockito.when(standaloneBroker.getMessageContainer()).thenReturn(pair.getLeft());
+
standaloneAdmin = new StandaloneAdmin();
}
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
index 3582f95ef..6d84cb780 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
@@ -17,15 +17,12 @@
package org.apache.eventmesh.storage.standalone.broker;
-import static org.apache.eventmesh.storage.standalone.TestUtils.OFF_SET;
import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
import static
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
+import static
org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;
import org.apache.eventmesh.storage.standalone.broker.model.MessageEntity;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,6 +31,14 @@ import io.cloudevents.CloudEvent;
public class StandaloneBrokerTest {
+
+ public StandaloneBroker getStandaloneBroker() {
+ StandaloneBroker instance = StandaloneBroker.getInstance();
+ Subscribe subscribe = createSubscribe(instance);
+ subscribe.subscribe();
+ return instance;
+ }
+
@Test
public void testGetInstance() {
Assertions.assertNotNull(StandaloneBroker.getInstance());
@@ -41,49 +46,23 @@ public class StandaloneBrokerTest {
@Test
public void testCreateTopicIfAbsent() {
- StandaloneBroker instance = StandaloneBroker.getInstance();
- Pair<MessageQueue, AtomicLong> pair =
instance.createTopicIfAbsent(TEST_TOPIC);
+ StandaloneBroker instance = getStandaloneBroker();
+ Channel pair = instance.createTopicIfAbsent(TEST_TOPIC);
Assertions.assertNotNull(pair);
}
@Test
public void testPutMessage() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
+ StandaloneBroker instance = getStandaloneBroker();
CloudEvent cloudEvent = createDefaultCloudEvent();
MessageEntity messageEntity = instance.putMessage(TEST_TOPIC,
cloudEvent);
Assertions.assertNotNull(messageEntity);
}
- @Test
- public void testTakeMessage() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
- CloudEvent cloudEvent = createDefaultCloudEvent();
- instance.putMessage(TEST_TOPIC, cloudEvent);
- CloudEvent message = instance.takeMessage(TEST_TOPIC);
- Assertions.assertNotNull(message);
- }
-
- @Test
- public void testGetMessage() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
- CloudEvent cloudEvent = createDefaultCloudEvent();
- instance.putMessage(TEST_TOPIC, cloudEvent);
- CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC);
- Assertions.assertNotNull(cloudEventResult);
- }
-
- @Test
- public void testMessageWithOffSet() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
- CloudEvent cloudEvent = createDefaultCloudEvent();
- instance.putMessage(TEST_TOPIC, cloudEvent);
- CloudEvent cloudEventResult = instance.getMessage(TEST_TOPIC, OFF_SET);
- Assertions.assertNotNull(cloudEventResult);
- }
@Test
public void testCheckTopicExist() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
+ StandaloneBroker instance = getStandaloneBroker();
CloudEvent cloudEvent = createDefaultCloudEvent();
instance.putMessage(TEST_TOPIC, cloudEvent);
boolean exists = instance.checkTopicExist(TEST_TOPIC);
@@ -92,7 +71,7 @@ public class StandaloneBrokerTest {
@Test
public void testDeleteTopicIfExist() throws InterruptedException {
- StandaloneBroker instance = StandaloneBroker.getInstance();
+ StandaloneBroker instance = getStandaloneBroker();
CloudEvent cloudEvent = createDefaultCloudEvent();
instance.putMessage(TEST_TOPIC, cloudEvent);
instance.deleteTopicIfExist(TEST_TOPIC);
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
index bc11c9b0a..3ef86bdd2 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/task/SubscribeTest.java
@@ -18,14 +18,11 @@
package org.apache.eventmesh.storage.standalone.broker.task;
import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
-import static
org.apache.eventmesh.storage.standalone.TestUtils.createDefaultCloudEvent;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.eventmesh.api.EventListener;
-import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.junit.jupiter.api.Assertions;
@@ -35,8 +32,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import io.cloudevents.CloudEvent;
-
@ExtendWith(MockitoExtension.class)
public class SubscribeTest {
@@ -48,12 +43,9 @@ public class SubscribeTest {
@Test
public void testSubscribe() {
- CloudEvent cloudEvent = createDefaultCloudEvent();
-
Mockito.when(standaloneBroker.getMessage(anyString())).thenReturn(cloudEvent);
- Mockito.when(standaloneBroker.getMessage(anyString(),
anyLong())).thenReturn(cloudEvent);
subscribe = new Subscribe(TEST_TOPIC, standaloneBroker, eventListener);
subscribe.subscribe();
- Mockito.verify(eventListener).consume(any(CloudEvent.class),
any(EventMeshAsyncConsumeContext.class));
+ Mockito.verify(standaloneBroker).subscribed(anyString(),
any(Subscribe.class));
}
@Test
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
index 37cdc02c6..4bfee4976 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
@@ -17,8 +17,11 @@
package org.apache.eventmesh.storage.standalone.producer;
+import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
+
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.storage.standalone.TestUtils;
+import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import java.util.Properties;
@@ -28,10 +31,13 @@ import org.junit.jupiter.api.Test;
import io.cloudevents.CloudEvent;
+
+
public class StandaloneProducerTest {
private StandaloneProducer standaloneProducer;
+
@BeforeEach
public void setUp() {
standaloneProducer = new StandaloneProducer(new Properties());
@@ -61,6 +67,8 @@ public class StandaloneProducerTest {
@Test
public void testPublish() {
+ StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
+ standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
SendResult sendResult = standaloneProducer.publish(cloudEvent);
Assertions.assertNotNull(sendResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]