vongosling closed pull request #73: [ROCKETMQ-135] Broker cannot be properly 
finalized on failure to load…
URL: https://github.com/apache/rocketmq/pull/73
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b656870b9..004dec9d5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -37,6 +37,7 @@
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.exception.BrokerException;
 import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
@@ -204,7 +205,7 @@ public boolean initialize() throws 
CloneNotSupportedException {
                 //load plugin
                 MessageStorePluginContext context = new 
MessageStorePluginContext(messageStoreConfig, brokerStatsManager, 
messageArrivingListener, brokerConfig);
                 this.messageStore = MessageStoreFactory.build(context, 
this.messageStore);
-            } catch (IOException e) {
+            } catch (IOException | BrokerException e) {
                 result = false;
                 e.printStackTrace();
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/exception/BrokerException.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/exception/BrokerException.java
new file mode 100644
index 000000000..1171acb18
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/exception/BrokerException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.exception;
+
+/**
+ * Broker exception.
+ */
+public class BrokerException extends Exception {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates a new exception with an error message.
+     *
+     * @param msg Error message.
+     */
+    public BrokerException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new exception with an error message and throwable as a cause.
+     *
+     * @param cause Throwable cause.
+     */
+    public BrokerException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates a new exception with an error message and throwable as a cause.
+     *
+     * @param msg Error message.
+     * @param cause Throwable cause.
+     */
+    public BrokerException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 00257fd68..6f8c732c4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -28,9 +28,12 @@
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
+/**
+ * An abstract class that can be used to implement an alternative message 
store.
+ */
 public abstract class AbstractPluginMessageStore implements MessageStore {
-    protected MessageStore next = null;
-    protected MessageStorePluginContext context;
+    private final MessageStore next;
+    private final MessageStorePluginContext context;
 
     public AbstractPluginMessageStore(MessageStorePluginContext context, 
MessageStore next) {
         this.next = next;
@@ -233,5 +236,4 @@ public long getConfirmOffset() {
     public void setConfirmOffset(long phyOffset) {
         next.setConfirmOffset(phyOffset);
     }
-
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
index 8db538b84..eca0f6aa7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -17,29 +17,42 @@
 
 package org.apache.rocketmq.broker.plugin;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
+import org.apache.rocketmq.broker.exception.BrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.MessageStore;
 
+/**
+ * Factory to load a plugin message store.
+ */
 public final class MessageStoreFactory {
-    public final static MessageStore build(MessageStorePluginContext context, 
MessageStore messageStore)
-        throws IOException {
-        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
-        if (plugin != null && plugin.trim().length() != 0) {
-            String[] pluginClasses = plugin.split(",");
-            for (int i = pluginClasses.length - 1; i >= 0; --i) {
-                String pluginClass = pluginClasses[i];
-                try {
-                    @SuppressWarnings("unchecked")
-                    Class<AbstractPluginMessageStore> clazz = 
(Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
-                    Constructor<AbstractPluginMessageStore> construct = 
clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
-                    messageStore = construct.newInstance(context, 
messageStore);
-                } catch (Throwable e) {
-                    throw new RuntimeException(String.format(
-                        "Initialize plugin's class %s not found!", 
pluginClass), e);
-                }
+
+    /**
+     * Creates a new {@link MessageStore} by the store plugin class name 
specified in {@link BrokerConfig}.
+     *
+     * @param context Store plugin context.
+     * @param messageStore Default message store.
+     * @return A new {@link MessageStore} if configured.
+     * @throws BrokerException If a plugin cannot be loaded.
+     */
+    public static MessageStore build(MessageStorePluginContext context,
+        MessageStore messageStore) throws BrokerException {
+        String pluginClass = context.getBrokerConfig().getMessageStorePlugIn();
+
+        if (pluginClass != null && !pluginClass.trim().isEmpty()) {
+            try {
+                @SuppressWarnings("unchecked")
+                Class<AbstractPluginMessageStore> clazz =
+                    (Class<AbstractPluginMessageStore>) 
Class.forName(pluginClass.trim());
+                Constructor<AbstractPluginMessageStore> construct =
+                    clazz.getConstructor(MessageStorePluginContext.class, 
MessageStore.class);
+                messageStore = construct.newInstance(context, messageStore);
+            } catch (Throwable e) {
+                throw new BrokerException(String.format(
+                    "Initialize plugin's class %s not found!", 
pluginClass.trim()), e);
             }
         }
+
         return messageStore;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
index b822a2f75..6e29b07cf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
@@ -22,15 +22,17 @@
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+/**
+ * Context for a message store plugin.
+ */
 public class MessageStorePluginContext {
-    private MessageStoreConfig messageStoreConfig;
-    private BrokerStatsManager brokerStatsManager;
-    private MessageArrivingListener messageArrivingListener;
-    private BrokerConfig brokerConfig;
-
-    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
-        BrokerStatsManager brokerStatsManager, MessageArrivingListener 
messageArrivingListener,
-        BrokerConfig brokerConfig) {
+    private final MessageStoreConfig messageStoreConfig;
+    private final BrokerStatsManager brokerStatsManager;
+    private final MessageArrivingListener messageArrivingListener;
+    private final BrokerConfig brokerConfig;
+
+    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, 
BrokerStatsManager brokerStatsManager,
+        MessageArrivingListener messageArrivingListener, BrokerConfig 
brokerConfig) {
         super();
         this.messageStoreConfig = messageStoreConfig;
         this.brokerStatsManager = brokerStatsManager;
@@ -53,5 +55,4 @@ public MessageArrivingListener getMessageArrivingListener() {
     public BrokerConfig getBrokerConfig() {
         return brokerConfig;
     }
-
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/plugin/MessageStoreFactoryTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/plugin/MessageStoreFactoryTest.java
new file mode 100644
index 000000000..4d11fbd96
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/plugin/MessageStoreFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.plugin;
+
+import org.apache.rocketmq.broker.exception.BrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MessageStoreFactoryTest {
+    @Mock
+    private DefaultMessageStore messageStore;
+
+    @Test
+    public void buildWithoutPluginConfigured() throws Exception {
+        MessageStorePluginContext ctx = new MessageStorePluginContext(new 
MessageStoreConfig(), null, null, new BrokerConfig());
+        assertThat(MessageStoreFactory.build(ctx, 
messageStore)).isInstanceOf(DefaultMessageStore.class);
+    }
+
+    @Test
+    public void buildWithPluginMisConfigured() throws Exception {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setMessageStorePlugIn("NonExistentPlugin ");
+        MessageStorePluginContext ctx = new MessageStorePluginContext(new 
MessageStoreConfig(), null, null, brokerConfig);
+        try {
+            MessageStoreFactory.build(ctx, messageStore);
+            fail("No exception!");
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(BrokerException.class);
+        }
+    }
+
+    @Test
+    public void buildWithPluginConfigured() throws Exception {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        
brokerConfig.setMessageStorePlugIn("org.apache.rocketmq.broker.plugin.MessageStoreFactoryTest$PluginStoreMock
 ");
+        MessageStorePluginContext ctx = new MessageStorePluginContext(new 
MessageStoreConfig(), null, null, brokerConfig);
+        assertThat(MessageStoreFactory.build(ctx, 
messageStore)).isInstanceOf(PluginStoreMock.class);
+    }
+
+    /**
+     * Mock.
+     */
+    static class PluginStoreMock extends AbstractPluginMessageStore {
+
+        public PluginStoreMock(MessageStorePluginContext context, MessageStore 
next) {
+            super(context, next);
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to