This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 831fd72ea [ISSUE #5127] fix create topic error in Standalone mode 
(#5128)
831fd72ea is described below

commit 831fd72ea2d6e600fb3c530ff0240750c8976f82
Author: Jevin Jiang <[email protected]>
AuthorDate: Thu Dec 5 22:20:12 2024 +0800

    [ISSUE #5127] fix create topic error in Standalone mode (#5128)
    
    * [ISSUE #5127] fix
    
    * [ISSUE #5127] fix
    
    * [ISSUE #5127] fix
    
    * [ISSUE #5127] fix
    
    * [ISSUE #5127] fix checkstyle test
    
    ---------
    
    Co-authored-by: JiangShuJu <[email protected]>
---
 .../storage/standalone/broker/Channel.java         |  8 ++++++-
 .../standalone/broker/StandaloneBroker.java        | 26 ++++++++++++----------
 .../eventmesh/storage/standalone/TestUtils.java    |  2 ++
 .../standalone/broker/StandaloneBrokerTest.java    |  9 --------
 .../producer/StandaloneProducerTest.java           |  4 ++++
 5 files changed, 27 insertions(+), 22 deletions(-)

diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
index 2ea7310b8..8de0ca1c5 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java
@@ -31,6 +31,7 @@ import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
 import lombok.Getter;
+import lombok.Setter;
 
 
 public class Channel implements LifeCycle {
@@ -39,11 +40,16 @@ public class Channel implements LifeCycle {
     @Getter
     private DisruptorProvider provider;
     private final Integer size;
-    private final EventHandler<MessageEntity> eventHandler;
+    @Setter
+    private EventHandler<MessageEntity> eventHandler;
     private volatile boolean started = false;
     private final TopicMetadata topic;
     private static final String THREAD_NAME_PREFIX = 
"standalone_disruptor_provider_";
 
+    public Channel(TopicMetadata topic) {
+        this(DEFAULT_SIZE, topic, null);
+    }
+
     public Channel(TopicMetadata topic, EventHandler<MessageEntity> 
eventHandler) {
         this(DEFAULT_SIZE, topic, eventHandler);
     }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
index 8654b2d1c..0cda57633 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java
@@ -60,9 +60,12 @@ public class StandaloneBroker {
     public MessageEntity putMessage(String topicName, CloudEvent message) {
         TopicMetadata topicMetadata = new TopicMetadata(topicName);
         if (!messageContainer.containsKey(topicMetadata)) {
-            createTopic(topicName);
+            throw new RuntimeException(String.format("The topic:%s is not 
created", topicName));
         }
         Channel channel = messageContainer.get(topicMetadata);
+        if (channel.isClosed()) {
+            throw new RuntimeException(String.format("The topic:%s is not 
subscribed", topicName));
+        }
         MessageEntity messageEntity = new MessageEntity(new 
TopicMetadata(topicName), message);
         channel.getProvider().onData(messageEntity);
         return messageEntity;
@@ -70,15 +73,7 @@ public class StandaloneBroker {
 
     public Channel createTopic(String topicName) {
         TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        return messageContainer.computeIfAbsent(topicMetadata, k -> {
-            Subscribe subscribe = subscribeContainer.get(topicMetadata);
-            if (subscribe == null) {
-                throw new IllegalStateException("the topic not exist subscribe 
");
-            }
-            Channel channel = new Channel(topicMetadata, subscribe);
-            channel.start();
-            return channel;
-        });
+        return messageContainer.computeIfAbsent(topicMetadata, k -> new 
Channel(topicMetadata));
     }
 
     /**
@@ -139,10 +134,17 @@ public class StandaloneBroker {
 
     public void subscribed(String topicName, Subscribe subscribe) {
         TopicMetadata topicMetadata = new TopicMetadata(topicName);
-        if (getMessageContainer().containsKey(topicMetadata)) {
-            log.warn("the topic already subscribed");
+        if (subscribeContainer.containsKey(topicMetadata)) {
+            log.warn("the topic:{} already subscribed", topicName);
+            return;
+        }
+        Channel channel = getMessageContainer().get(topicMetadata);
+        if (channel == null) {
+            log.warn("the topic:{} is not created", topicName);
             return;
         }
+        channel.setEventHandler(subscribe);
+        channel.start();
         subscribeContainer.put(topicMetadata, subscribe);
     }
 
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
index 0c16aabb3..5571cda95 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java
@@ -93,11 +93,13 @@ public class TestUtils {
     }
 
     public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) 
{
+        standaloneBroker.createTopic(TEST_TOPIC);
         return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, 
context) -> {
         });
     }
 
     public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, 
List<CloudEvent> cloudEvents) {
+        standaloneBroker.createTopic(TEST_TOPIC);
         return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, 
context) -> {
             cloudEvents.add(cloudEvent);
         });
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
index 6d84cb780..d57ba6523 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java
@@ -69,13 +69,4 @@ public class StandaloneBrokerTest {
         Assertions.assertTrue(exists);
     }
 
-    @Test
-    public void testDeleteTopicIfExist() throws InterruptedException {
-        StandaloneBroker instance = getStandaloneBroker();
-        CloudEvent cloudEvent = createDefaultCloudEvent();
-        instance.putMessage(TEST_TOPIC, cloudEvent);
-        instance.deleteTopicIfExist(TEST_TOPIC);
-        boolean exists = instance.checkTopicExist(TEST_TOPIC);
-        Assertions.assertFalse(exists);
-    }
 }
diff --git 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
index 4bfee4976..20db66683 100644
--- 
a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
+++ 
b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java
@@ -18,10 +18,12 @@
 package org.apache.eventmesh.storage.standalone.producer;
 
 import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
+import static 
org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;
 
 import org.apache.eventmesh.api.SendResult;
 import org.apache.eventmesh.storage.standalone.TestUtils;
 import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
+import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;
 
 import java.util.Properties;
 
@@ -70,6 +72,8 @@ public class StandaloneProducerTest {
         StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
         standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
         CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
+        Subscribe subscribe = createSubscribe(standaloneBroker);
+        subscribe.subscribe();
         SendResult sendResult = standaloneProducer.publish(cloudEvent);
         Assertions.assertNotNull(sendResult);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to