This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 831fd72ea [ISSUE #5127] fix create topic error in Standalone mode
(#5128)
831fd72ea is described below
commit 831fd72ea2d6e600fb3c530ff0240750c8976f82
Author: Jevin Jiang <[email protected]>
AuthorDate: Thu Dec 5 22:20:12 2024 +0800
[ISSUE #5127] fix create topic error in Standalone mode (#5128)
* [ISSUE #5127] fix
* [ISSUE #5127] fix
* [ISSUE #5127] fix
* [ISSUE #5127] fix
* [ISSUE #5127] fix checkstyle test
---------
Co-authored-by: JiangShuJu <[email protected]>
---
.../storage/standalone/broker/Channel.java | 8 ++++++-
.../standalone/broker/StandaloneBroker.java | 26 ++++++++++++----------
.../eventmesh/storage/standalone/TestUtils.java | 2 ++
.../standalone/broker/StandaloneBrokerTest.java | 9 --------
.../producer/StandaloneProducerTest.java | 4 ++++
5 files changed, 27 insertions(+), 22 deletions(-)
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
index 2ea7310b8..8de0ca1c5 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
@@ -31,6 +31,7 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Getter;
+import lombok.Setter;
public class Channel implements LifeCycle {
@@ -39,11 +40,16 @@ public class Channel implements LifeCycle {
@Getter
private DisruptorProvider provider;
private final Integer size;
- private final EventHandler<MessageEntity> eventHandler;
+ @Setter
+ private EventHandler<MessageEntity> eventHandler;
private volatile boolean started = false;
private final TopicMetadata topic;
private static final String THREAD_NAME_PREFIX =
"standalone_disruptor_provider_";
+ public Channel(TopicMetadata topic) {
+ this(DEFAULT_SIZE, topic, null);
+ }
+
public Channel(TopicMetadata topic, EventHandler<MessageEntity>
eventHandler) {
this(DEFAULT_SIZE, topic, eventHandler);
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
index 8654b2d1c..0cda57633 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
@@ -60,9 +60,12 @@ public class StandaloneBroker {
public MessageEntity putMessage(String topicName, CloudEvent message) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (!messageContainer.containsKey(topicMetadata)) {
- createTopic(topicName);
+ throw new RuntimeException(String.format("The topic:%s is not
created", topicName));
}
Channel channel = messageContainer.get(topicMetadata);
+ if (channel.isClosed()) {
+ throw new RuntimeException(String.format("The topic:%s is not
subscribed", topicName));
+ }
MessageEntity messageEntity = new MessageEntity(new
TopicMetadata(topicName), message);
channel.getProvider().onData(messageEntity);
return messageEntity;
@@ -70,15 +73,7 @@ public class StandaloneBroker {
public Channel createTopic(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
- return messageContainer.computeIfAbsent(topicMetadata, k -> {
- Subscribe subscribe = subscribeContainer.get(topicMetadata);
- if (subscribe == null) {
- throw new IllegalStateException("the topic not exist subscribe
");
- }
- Channel channel = new Channel(topicMetadata, subscribe);
- channel.start();
- return channel;
- });
+ return messageContainer.computeIfAbsent(topicMetadata, k -> new
Channel(topicMetadata));
}
/**
@@ -139,10 +134,17 @@ public class StandaloneBroker {
public void subscribed(String topicName, Subscribe subscribe) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
- if (getMessageContainer().containsKey(topicMetadata)) {
- log.warn("the topic already subscribed");
+ if (subscribeContainer.containsKey(topicMetadata)) {
+ log.warn("the topic:{} already subscribed", topicName);
+ return;
+ }
+ Channel channel = getMessageContainer().get(topicMetadata);
+ if (channel == null) {
+ log.warn("the topic:{} is not created", topicName);
return;
}
+ channel.setEventHandler(subscribe);
+ channel.start();
subscribeContainer.put(topicMetadata, subscribe);
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
index 0c16aabb3..5571cda95 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
@@ -93,11 +93,13 @@ public class TestUtils {
}
public static Subscribe createSubscribe(StandaloneBroker standaloneBroker)
{
+ standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent,
context) -> {
});
}
public static Subscribe createSubscribe(StandaloneBroker standaloneBroker,
List<CloudEvent> cloudEvents) {
+ standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent,
context) -> {
cloudEvents.add(cloudEvent);
});
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
index 6d84cb780..d57ba6523 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
@@ -69,13 +69,4 @@ public class StandaloneBrokerTest {
Assertions.assertTrue(exists);
}
- @Test
- public void testDeleteTopicIfExist() throws InterruptedException {
- StandaloneBroker instance = getStandaloneBroker();
- CloudEvent cloudEvent = createDefaultCloudEvent();
- instance.putMessage(TEST_TOPIC, cloudEvent);
- instance.deleteTopicIfExist(TEST_TOPIC);
- boolean exists = instance.checkTopicExist(TEST_TOPIC);
- Assertions.assertFalse(exists);
- }
}
diff --git
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
index 4bfee4976..20db66683 100644
---
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
+++
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
@@ -18,10 +18,12 @@
package org.apache.eventmesh.storage.standalone.producer;
import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
+import static
org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.storage.standalone.TestUtils;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
import java.util.Properties;
@@ -70,6 +72,8 @@ public class StandaloneProducerTest {
StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
+ Subscribe subscribe = createSubscribe(standaloneBroker);
+ subscribe.subscribe();
SendResult sendResult = standaloneProducer.publish(cloudEvent);
Assertions.assertNotNull(sendResult);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]