http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java index cd66699..0830842 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java index 3c1ef3d..405781b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; import org.apache.rocketmq.common.message.MessageQueue; - /** * Consumer Orderly consumption context * @@ -28,32 +27,26 @@ public class ConsumeOrderlyContext { private boolean autoCommit = true; private long suspendCurrentQueueTimeMillis = -1; - public ConsumeOrderlyContext(MessageQueue messageQueue) { this.messageQueue = messageQueue; } - public boolean isAutoCommit() { return autoCommit; } - public void setAutoCommit(boolean autoCommit) { this.autoCommit = autoCommit; } - public MessageQueue getMessageQueue() { return messageQueue; } - public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } - public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java index 2e55d89..0c6c6e6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java index 99083b4..5de4ded 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java index 5d05452..2a9e5c9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java index 1c59ef7..c083157 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; -import org.apache.rocketmq.common.message.MessageExt; - import java.util.List; - +import org.apache.rocketmq.common.message.MessageExt; /** * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently @@ -37,5 +35,5 @@ public interface MessageListenerConcurrently extends MessageListener { * @return The consume status */ ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, - final ConsumeConcurrentlyContext context); + final ConsumeConcurrentlyContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java index 5de976f..57a553a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.listener; -import org.apache.rocketmq.common.message.MessageExt; - import java.util.List; - +import org.apache.rocketmq.common.message.MessageExt; /** * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread @@ -37,5 +35,5 @@ public interface MessageListenerOrderly extends MessageListener { * @return The consume status */ ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, - final ConsumeOrderlyContext context); + final ConsumeOrderlyContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java index 218f659..256c639 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java @@ -16,25 +16,22 @@ */ package org.apache.rocketmq.client.consumer.rebalance; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.List; - - /** * Average Hashing queue algorithm - * */ public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final Logger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { + List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } @@ -48,17 +45,17 @@ public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrate List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", - consumerGroup, - currentCID, - cidAll); + consumerGroup, + currentCID, + cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = - mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() - + 1 : mqAll.size() / cidAll.size()); + mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java index d612d4f..5df5cd2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java @@ -16,25 +16,22 @@ */ package org.apache.rocketmq.client.consumer.rebalance; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.List; - - /** * Cycle average Hashing queue algorithm - * */ public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { private final Logger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { + List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } @@ -48,9 +45,9 @@ public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQue List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", - consumerGroup, - currentCID, - cidAll); + consumerGroup, + currentCID, + cidAll); return result; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java index c8fe2d1..387822d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java @@ -6,28 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.rebalance; +import java.util.List; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; - - public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { private List<MessageQueue> messageQueueList; @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { + List<String> cidAll) { return this.messageQueueList; } @@ -40,7 +38,6 @@ public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrateg return messageQueueList; } - public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java index adfc124..a154f89 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java @@ -6,23 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.rebalance; -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.ArrayList; import java.util.List; import java.util.Set; - +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.MessageQueue; /** * Computer room Hashing queue algorithm, such as Alipay logic room @@ -32,7 +30,7 @@ public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueSt @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, - List<String> cidAll) { + List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); int currentIndex = cidAll.indexOf(currentCID); if (currentIndex < 0) { @@ -68,7 +66,6 @@ public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueSt return consumeridcs; } - public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index bdaeb58..053ade2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.client.consumer.store; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -27,41 +34,29 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - /** * Local storage implementation - * */ public class LocalFileOffsetStore implements OffsetStore { public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( - "rocketmq.client.localOffsetStoreDir", - System.getProperty("user.home") + File.separator + ".rocketmq_offsets"); + "rocketmq.client.localOffsetStoreDir", + System.getProperty("user.home") + File.separator + ".rocketmq_offsets"); private final static Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String groupName; private final String storePath; private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); - + new ConcurrentHashMap<MessageQueue, AtomicLong>(); public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { this.mQClientFactory = mQClientFactory; this.groupName = groupName; this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + // - this.mQClientFactory.getClientId() + File.separator + // - this.groupName + File.separator + // - "offsets.json"; + this.mQClientFactory.getClientId() + File.separator + // + this.groupName + File.separator + // + "offsets.json"; } - @Override public void load() throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); @@ -71,14 +66,13 @@ public class LocalFileOffsetStore implements OffsetStore { for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info("load consumer's offset, {} {} {}", - this.groupName, - mq, - offset.get()); + this.groupName, + mq, + offset.get()); } } } - @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { @@ -97,7 +91,6 @@ public class LocalFileOffsetStore implements OffsetStore { } } - @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { @@ -134,7 +127,6 @@ public class LocalFileOffsetStore implements OffsetStore { return -1; } - @Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) @@ -158,7 +150,6 @@ public class LocalFileOffsetStore implements OffsetStore { } } - @Override public void persist(MessageQueue mq) { } @@ -170,7 +161,7 @@ public class LocalFileOffsetStore implements OffsetStore { @Override public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { } @@ -196,7 +187,7 @@ public class LocalFileOffsetStore implements OffsetStore { OffsetSerializeWrapper offsetSerializeWrapper = null; try { offsetSerializeWrapper = - OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); + OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { log.warn("readLocalOffset Exception, and try to correct", e); return this.readLocalOffsetBak(); @@ -212,12 +203,12 @@ public class LocalFileOffsetStore implements OffsetStore { OffsetSerializeWrapper offsetSerializeWrapper = null; try { offsetSerializeWrapper = - OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); + OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { log.warn("readLocalOffset Exception", e); throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" // - + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // - e); + + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // + e); } return offsetSerializeWrapper; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java index a9fadf2..4954f6f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.store; -import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; /** * Wrapper class for offset serialization @@ -29,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class OffsetSerializeWrapper extends RemotingSerializable { private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); + new ConcurrentHashMap<MessageQueue, AtomicLong>(); public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { return offsetTable; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java index 334f0a1..592796f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.store; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.util.Map; -import java.util.Set; - - /** * Offset store interface * @@ -37,7 +35,6 @@ public interface OffsetStore { */ void load() throws MQClientException; - /** * Update the offset,store it in memory * @@ -91,6 +88,6 @@ public interface OffsetStore { * @param offset * @param isOneway */ - void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException; + void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java index c2ee9b7..da16765 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.consumer.store; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 4adc18c..32ef877 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.client.consumer.store; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.FindBrokerResult; @@ -29,37 +35,25 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHea import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - /** * Remote storage implementation - * */ public class RemoteBrokerOffsetStore implements OffsetStore { private final static Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String groupName; private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); - + new ConcurrentHashMap<MessageQueue, AtomicLong>(); public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { this.mQClientFactory = mQClientFactory; this.groupName = groupName; } - @Override public void load() { } - @Override public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { @@ -78,7 +72,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } } - @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { @@ -117,7 +110,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore { return -1; } - @Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) @@ -133,10 +125,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { try { this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", - this.groupName, - this.mQClientFactory.getClientId(), - mq, - offset.get()); + this.groupName, + this.mQClientFactory.getClientId(), + mq, + offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } @@ -155,7 +147,6 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } } - @Override public void persist(MessageQueue mq) { AtomicLong offset = this.offsetTable.get(mq); @@ -163,10 +154,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { try { this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", - this.groupName, - this.mQClientFactory.getClientId(), - mq, - offset.get()); + this.groupName, + this.mQClientFactory.getClientId(), + mq, + offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } @@ -177,7 +168,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { if (mq != null) { this.offsetTable.remove(mq); log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq, - offsetTable.size()); + offsetTable.size()); } } @@ -199,7 +190,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { * here need to be optimized. */ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { + MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); } @@ -209,7 +200,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { */ @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { + MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { // TODO Here may be heavily overhead for Name Server,need tuning @@ -226,10 +217,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); @@ -237,7 +228,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { } private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { + InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { // TODO Here may be heavily overhead for Name Server,need tuning @@ -252,7 +243,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { requestHeader.setQueueId(mq.getQueueId()); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java index ce4bedb..7515a30 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java @@ -6,39 +6,35 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.exception; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.help.FAQUrl; - public class MQBrokerException extends Exception { private static final long serialVersionUID = 5975020272601250368L; private final int responseCode; private final String errorMessage; - public MQBrokerException(int responseCode, String errorMessage) { super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage)); + + errorMessage)); this.responseCode = responseCode; this.errorMessage = errorMessage; } - public int getResponseCode() { return responseCode; } - public String getErrorMessage() { return errorMessage; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java index 7ffab0d..41a2b59 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java @@ -6,36 +6,33 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.exception; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.help.FAQUrl; - public class MQClientException extends Exception { private static final long serialVersionUID = -5758410930844185841L; private int responseCode; private String errorMessage; - public MQClientException(String errorMessage, Throwable cause) { super(FAQUrl.attachDefaultURL(errorMessage), cause); this.responseCode = -1; this.errorMessage = errorMessage; } - public MQClientException(int responseCode, String errorMessage) { super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage)); + + errorMessage)); this.responseCode = responseCode; this.errorMessage = errorMessage; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java index e84beff..cf6c157 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; @@ -21,7 +21,6 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; - public class CheckForbiddenContext { private String nameSrvAddr; private String group; @@ -34,112 +33,91 @@ public class CheckForbiddenContext { private Object arg; private boolean unitMode = false; - public String getGroup() { return group; } - public void setGroup(String group) { this.group = group; } - public Message getMessage() { return message; } - public void setMessage(Message message) { this.message = message; } - public MessageQueue getMq() { return mq; } - public void setMq(MessageQueue mq) { this.mq = mq; } - public String getBrokerAddr() { return brokerAddr; } - public void setBrokerAddr(String brokerAddr) { this.brokerAddr = brokerAddr; } - public CommunicationMode getCommunicationMode() { return communicationMode; } - public void setCommunicationMode(CommunicationMode communicationMode) { this.communicationMode = communicationMode; } - public SendResult getSendResult() { return sendResult; } - public void setSendResult(SendResult sendResult) { this.sendResult = sendResult; } - public Exception getException() { return exception; } - public void setException(Exception exception) { this.exception = exception; } - public Object getArg() { return arg; } - public void setArg(Object arg) { this.arg = arg; } - public boolean isUnitMode() { return unitMode; } - public void setUnitMode(boolean isUnitMode) { this.unitMode = isUnitMode; } - public String getNameSrvAddr() { return nameSrvAddr; } - public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } - @Override public String toString() { return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message - + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode - + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode - + ", arg=" + arg + "]"; + + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode + + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode + + ", arg=" + arg + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java index d6f75bb..7faf14b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java @@ -6,23 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; import org.apache.rocketmq.client.exception.MQClientException; - public interface CheckForbiddenHook { String hookName(); - void checkForbidden(final CheckForbiddenContext context) throws MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java index f141fac..5bababa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.List; import java.util.Map; - +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; public class ConsumeMessageContext { private String consumerGroup; @@ -32,72 +30,58 @@ public class ConsumeMessageContext { private Object mqTraceContext; private Map<String, String> props; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public List<MessageExt> getMsgList() { return msgList; } - public void setMsgList(List<MessageExt> msgList) { this.msgList = msgList; } - public MessageQueue getMq() { return mq; } - public void setMq(MessageQueue mq) { this.mq = mq; } - public boolean isSuccess() { return success; } - public void setSuccess(boolean success) { this.success = success; } - public Object getMqTraceContext() { return mqTraceContext; } - public void setMqTraceContext(Object mqTraceContext) { this.mqTraceContext = mqTraceContext; } - public Map<String, String> getProps() { return props; } - public void setProps(Map<String, String> props) { this.props = props; } - public String getStatus() { return status; } - public void setStatus(String status) { this.status = status; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java index 8161d2e..95db7b2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java index 23340d3..bc22546 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java @@ -6,22 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; +import java.util.List; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; - - public class FilterMessageContext { private String consumerGroup; private List<MessageExt> msgList; @@ -29,60 +27,49 @@ public class FilterMessageContext { private Object arg; private boolean unitMode; - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public List<MessageExt> getMsgList() { return msgList; } - public void setMsgList(List<MessageExt> msgList) { this.msgList = msgList; } - public MessageQueue getMq() { return mq; } - public void setMq(MessageQueue mq) { this.mq = mq; } - public Object getArg() { return arg; } - public void setArg(Object arg) { this.arg = arg; } - public boolean isUnitMode() { return unitMode; } - public void setUnitMode(boolean isUnitMode) { this.unitMode = isUnitMode; } - @Override public String toString() { return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq=" - + mq + ", arg=" + arg + "]"; + + mq + ", arg=" + arg + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java index 48fd513..095de32 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; public interface FilterMessageHook { String hookName(); - void filterMessage(final FilterMessageContext context); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java index bfb4a47..34e22a3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; +import java.util.Map; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.SendResult; @@ -23,9 +24,6 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageType; -import java.util.Map; - - public class SendMessageContext { private String producerGroup; private Message message; @@ -60,97 +58,78 @@ public class SendMessageContext { return producerGroup; } - public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } - public Message getMessage() { return message; } - public void setMessage(Message message) { this.message = message; } - public MessageQueue getMq() { return mq; } - public void setMq(MessageQueue mq) { this.mq = mq; } - public String getBrokerAddr() { return brokerAddr; } - public void setBrokerAddr(String brokerAddr) { this.brokerAddr = brokerAddr; } - public CommunicationMode getCommunicationMode() { return communicationMode; } - public void setCommunicationMode(CommunicationMode communicationMode) { this.communicationMode = communicationMode; } - public SendResult getSendResult() { return sendResult; } - public void setSendResult(SendResult sendResult) { this.sendResult = sendResult; } - public Exception getException() { return exception; } - public void setException(Exception exception) { this.exception = exception; } - public Object getMqTraceContext() { return mqTraceContext; } - public void setMqTraceContext(Object mqTraceContext) { this.mqTraceContext = mqTraceContext; } - public Map<String, String> getProps() { return props; } - public void setProps(Map<String, String> props) { this.props = props; } - public String getBornHost() { return bornHost; } - public void setBornHost(String bornHost) { this.bornHost = bornHost; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java index c040831..16a86a0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java +++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.hook; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index bb008bf..46ce08c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -6,16 +6,20 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl; +import io.netty.channel.ChannelHandlerContext; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.MQProducerInner; import org.apache.rocketmq.client.log.ClientLogger; @@ -30,29 +34,26 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.apache.rocketmq.common.protocol.header.*; import org.slf4j.Logger; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - - public class ClientRemotingProcessor implements NettyRequestProcessor { private final Logger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; - public ClientRemotingProcessor(final MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } - @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { @@ -83,7 +84,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final CheckTransactionStateRequestHeader requestHeader = - (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); + (CheckTransactionStateRequestHeader)request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); final MessageExt messageExt = MessageDecoder.decode(byteBuffer); if (messageExt != null) { @@ -109,10 +110,10 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { try { final NotifyConsumerIdsChangedRequestHeader requestHeader = - (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); + (NotifyConsumerIdsChangedRequestHeader)request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.getConsumerGroup()); + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.getConsumerGroup()); this.mqClientFactory.rebalanceImmediately(); } catch (Exception e) { log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e)); @@ -122,10 +123,11 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = - (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); + (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp()}); + new Object[] { + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp()}); Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); if (request.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); @@ -139,7 +141,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerStatusRequestHeader requestHeader = - (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); + (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup()); GetConsumerStatusBody body = new GetConsumerStatusBody(); @@ -152,7 +154,7 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); if (null != consumerRunningInfo) { @@ -175,13 +177,13 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumeMessageDirectlyResultRequestHeader requestHeader = - (ConsumeMessageDirectlyResultRequestHeader) request - .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); + (ConsumeMessageDirectlyResultRequestHeader)request + .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); ConsumeMessageDirectlyResult result = - this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName()); + this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName()); if (null != result) { response.setCode(ResponseCode.SUCCESS); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java index 9af6794..07b3a36 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java index 8773f26..c6405d8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl; @@ -20,18 +20,15 @@ public class FindBrokerResult { private final String brokerAddr; private final boolean slave; - public FindBrokerResult(String brokerAddr, boolean slave) { this.brokerAddr = brokerAddr; this.slave = slave; } - public String getBrokerAddr() { return brokerAddr; } - public boolean isSlave() { return slave; }
