This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-469 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 0476e759db9886d775a3ee7f0393e10ca2f5f768 Author: gosonzhang <[email protected]> AuthorDate: Wed Jan 13 19:51:52 2021 +0800 [TUBEMQ-511]Replace the conditional operator (?:) with mid() --- .../apache/tubemq/server/broker/msgstore/MessageStore.java | 13 +++++-------- .../tubemq/server/broker/offset/DefaultOffsetManager.java | 10 ++++------ .../apache/tubemq/server/broker/web/BrokerAdminServlet.java | 13 +++++-------- .../java/org/apache/tubemq/server/master/MasterConfig.java | 3 ++- .../org/apache/tubemq/server/tools/cli/CliConsumer.java | 2 +- .../org/apache/tubemq/server/tools/cli/CliProducer.java | 2 +- 6 files changed, 18 insertions(+), 25 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java index 8f650b4..1987c32 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.TErrCodeConstants; import org.apache.tubemq.corebase.protobuf.generated.ClientBroker; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.ThreadUtils; import org.apache.tubemq.server.broker.BrokerConfig; import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; @@ -133,10 +134,8 @@ public class MessageStore implements Closeable { this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize()); this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl(); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; - memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000 - ? 6000 : (Math.min(tmpIndexReadCnt, 10000))); - fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000 - ? 8000 : (Math.min(tmpIndexReadCnt, 13500))); + memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000)); + fileMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 8000, 13500)); memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2); fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3); fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10); @@ -408,10 +407,8 @@ public class MessageStore implements Closeable { unflushDataHold.set(topicMetadata.getUnflushDataHold()); maxFileValidDurMs.set(parseDeletePolicy(topicMetadata.getDeletePolicy())); int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum; - memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000 - ? 6000 : (Math.min(tmpIndexReadCnt, 10000))); - fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000 - ? 8000 : (Math.min(tmpIndexReadCnt, 13500))); + memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000)); + fileMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 8000, 13500)); memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2); fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3); fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index 0e91dc2..50c8bd3 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.daemon.AbstractDaemonService; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.corebase.utils.Tuple3; @@ -121,8 +122,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse || (readStatus == TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS)) { long adjOffset = indexMaxOffset; if (readStatus != TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS) { - adjOffset = Math.min(reqOffset, indexMaxOffset); - adjOffset = Math.max(adjOffset, indexMinOffset); + adjOffset = MixedUtils.mid(reqOffset, indexMinOffset, indexMaxOffset); } regInfo.getAndSetOffset(adjOffset); } @@ -287,10 +287,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse long reSetOffset, final String modifier) { long oldOffset = -1; if (store != null) { - long firstOffset = store.getIndexMinOffset(); - long lastOffset = store.getIndexMaxOffset(); - reSetOffset = reSetOffset < firstOffset - ? firstOffset : Math.min(reSetOffset, lastOffset); + reSetOffset = MixedUtils.mid(reSetOffset, + store.getIndexMinOffset(), store.getIndexMaxOffset()); String offsetCacheKey = getOffsetCacheKey(topic, partitionId); getAndResetTmpOffset(group, offsetCacheKey); OffsetStorageInfo regInfo = diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index d35ba91..dd4bbdf 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import javax.servlet.http.HttpServletRequest; import org.apache.tubemq.corebase.TokenConstants; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corebase.utils.Tuple2; import org.apache.tubemq.corebase.utils.Tuple3; @@ -959,11 +960,9 @@ public class BrokerAdminServlet extends AbstractWebHandler { if (store == null) { continue; } - long firstOffset = store.getIndexMinOffset(); - long lastOffset = store.getIndexMaxOffset(); // adjust reset offset value - adjOffset = offsetTuple.getF0() < firstOffset - ? firstOffset : Math.min(offsetTuple.getF0(), lastOffset); + adjOffset = MixedUtils.mid(offsetTuple.getF0(), + store.getIndexMinOffset(), store.getIndexMaxOffset()); result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset)); } } @@ -1059,10 +1058,8 @@ public class BrokerAdminServlet extends AbstractWebHandler { if (store == null) { continue; } - long firstOffset = store.getIndexMinOffset(); - long lastOffset = store.getIndexMaxOffset(); - adjOffset = entry.getValue() < firstOffset - ? firstOffset : Math.min(entry.getValue(), lastOffset); + adjOffset = MixedUtils.mid(entry.getValue(), + store.getIndexMinOffset(), store.getIndexMaxOffset()); offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset)); } if (offsetVals.isEmpty()) { diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java index b112d66..c2a8007 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.tubemq.corebase.TBaseConstants; import org.apache.tubemq.corebase.config.TLSConfig; import org.apache.tubemq.corebase.utils.AddressUtils; +import org.apache.tubemq.corebase.utils.MixedUtils; import org.apache.tubemq.corebase.utils.TStringUtils; import org.apache.tubemq.corerpc.RpcConstants; import org.apache.tubemq.server.common.TServerConstants; @@ -467,7 +468,7 @@ public class MasterConfig extends AbstractFileConfig { } if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) { int tmpParallel = this.getInt(masterConf, "rebalanceParallel"); - this.rebalanceParallel = (tmpParallel <= 0) ? 1 : (Math.min(tmpParallel, 20)); + this.rebalanceParallel = MixedUtils.mid(tmpParallel, 1, 20); } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java index c8938a6..52f0800 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java @@ -177,7 +177,7 @@ public class CliConsumer extends CliAbstractBase { String fetchThreadCntStr = cli.getOptionValue(CliArgDef.FETCHTHREADS.longOpt); if (TStringUtils.isNotBlank(fetchThreadCntStr)) { int tmpFetchThreadCnt = Integer.parseInt(fetchThreadCntStr); - tmpFetchThreadCnt = (tmpFetchThreadCnt < 1) ? 1 : Math.min(tmpFetchThreadCnt, 100); + tmpFetchThreadCnt = MixedUtils.mid(tmpFetchThreadCnt, 1, 100); fetchThreadCnt = tmpFetchThreadCnt; } return true; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java index a6e7e38..8a01e29 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java @@ -152,7 +152,7 @@ public class CliProducer extends CliAbstractBase { String sendThreadCntStr = cli.getOptionValue(CliArgDef.SENDTHREADS.longOpt); if (TStringUtils.isNotBlank(sendThreadCntStr)) { int tmpThreadCnt = Integer.parseInt(sendThreadCntStr); - tmpThreadCnt = (tmpThreadCnt < 1) ? 1 : Math.min(tmpThreadCnt, 200); + tmpThreadCnt = MixedUtils.mid(tmpThreadCnt, 1, 200); sendThreadCnt = tmpThreadCnt; } String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
