This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6c0485295 minor improvement for logs and metrics
6c0485295 is described below
commit 6c04852955b153e4b518eae2c815e7054662cd25
Author: SSpirits <[email protected]>
AuthorDate: Tue Feb 14 17:10:49 2023 +0800
minor improvement for logs and metrics
---
.../broker/metrics/BrokerMetricsManager.java | 16 ++++++++-
.../remoting/metrics/RemotingMetricsManager.java | 3 ++
.../rocketmq/tieredstore/TieredDispatcher.java | 7 ++--
.../rocketmq/tieredstore/TieredMessageStore.java | 18 +++++-----
.../common/TieredMessageStoreConfig.java | 38 ++++++++++++++++++++++
.../container/TieredContainerManager.java | 3 ++
.../metrics/TieredStoreMetricsManager.java | 9 +++--
7 files changed, 78 insertions(+), 16 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 060b051ff..c8033ba44 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -36,6 +36,7 @@ import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -131,6 +132,12 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerReadyMessages = new
NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
+ public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new
ArrayList<String>() {
+ {
+ add(MixAll.CID_RMQ_SYS_PREFIX.toLowerCase());
+ }
+ };
+
public BrokerMetricsManager(BrokerController brokerController) {
this.brokerController = brokerController;
brokerConfig = brokerController.getBrokerConfig();
@@ -165,7 +172,13 @@ public class BrokerMetricsManager {
if (StringUtils.isBlank(group)) {
return false;
}
- return
group.toLowerCase().startsWith(MixAll.CID_RMQ_SYS_PREFIX.toLowerCase());
+ String groupInLowerCase = group.toLowerCase();
+ for (String prefix : SYSTEM_GROUP_PREFIX_LIST) {
+ if (groupInLowerCase.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
}
public static boolean isSystem(String topic, String group) {
@@ -439,6 +452,7 @@ public class BrokerMetricsManager {
.put(LABEL_VERSION,
MQVersion.getVersionDesc(attr.version).toLowerCase())
.put(LABEL_CONSUME_MODE,
attr.consumeMode.getTypeCN().toLowerCase())
.put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING)
+ .put(LABEL_IS_SYSTEM, isSystemGroup(attr.group))
.build();
measurement.record(count, attributes);
});
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
index 4ca1f033f..e76192eae 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -66,6 +66,9 @@ public class RemotingMetricsManager {
public static List<Pair<InstrumentSelector, View>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
+ (double) Duration.ofMillis(3).toMillis(),
+ (double) Duration.ofMillis(5).toMillis(),
+ (double) Duration.ofMillis(7).toMillis(),
(double) Duration.ofMillis(10).toMillis(),
(double) Duration.ofMillis(100).toMillis(),
(double) Duration.ofSeconds(1).toMillis(),
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 0b1194953..7bc51d634 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -233,13 +233,14 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
try {
long queueOffset = container.getDispatchOffset();
- beforeOffset = queueOffset;
if (minOffsetInQueue > queueOffset) {
+ logger.warn("BlobDispatcher#dispatchByMQContainer: message
that needs to be dispatched does not exist: topic: {}, queueId: {}, message
queue offset: {}, min queue offset: {}",
+ topic, queueId, queueOffset, minOffsetInQueue);
container.initOffset(minOffsetInQueue);
queueOffset = minOffsetInQueue;
- logger.warn("TieredDispatcher#dispatchByMQContainer: message
that needs to be dispatched does not exist: topic: {}, queueId: {}, message
queue offset: {}, min queue offset: {}",
- topic, queueId, queueOffset, minOffsetInQueue);
}
+ beforeOffset = queueOffset;
+
// TODO flow control based on message size
long limit = Math.min(queueOffset + 100000, maxOffsetInQueue);
ConsumeQueue consumeQueue = (ConsumeQueue)
defaultStore.getConsumeQueue(topic, queueId);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 5ef1b2081..0ae891c77 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -22,7 +22,6 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -54,13 +53,13 @@ import
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
public class TieredMessageStore extends AbstractPluginMessageStore {
- private static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
- private final TieredMessageFetcher fetcher;
- private final TieredDispatcher dispatcher;
- private final String brokerName;
- private final TieredMessageStoreConfig storeConfig;
- private final TieredContainerManager containerManager;
- private final TieredMetadataStore metadataStore;
+ protected static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ protected final TieredMessageFetcher fetcher;
+ protected final TieredDispatcher dispatcher;
+ protected final String brokerName;
+ protected final TieredMessageStoreConfig storeConfig;
+ protected final TieredContainerManager containerManager;
+ protected final TieredMetadataStore metadataStore;
public TieredMessageStore(MessageStorePluginContext context, MessageStore
next) {
super(context, next);
@@ -320,8 +319,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
@Override
public List<Pair<InstrumentSelector, View>> getMetricsView() {
- List<Pair<InstrumentSelector, View>> res = new ArrayList<>();
- res.addAll(next.getMetricsView());
+ List<Pair<InstrumentSelector, View>> res = super.getMetricsView();
res.addAll(TieredStoreMetricsManager.getMetricsView());
return res;
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index 6cc51f541..2d43b9f6b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -99,6 +99,12 @@ public class TieredMessageStoreConfig {
private String tieredStoreFilepath = "";
+ // only for oss storage provider
+ private String ossEndpoint = "";
+ private String ossBucket = "";
+ private String ossAccessKey = "";
+ private String ossSecretKey = "";
+
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -331,4 +337,36 @@ public class TieredMessageStoreConfig {
public void setTieredStoreFilepath(String tieredStoreFilepath) {
this.tieredStoreFilepath = tieredStoreFilepath;
}
+
+ public String getOssEndpoint() {
+ return ossEndpoint;
+ }
+
+ public void setOssEndpoint(String ossEndpoint) {
+ this.ossEndpoint = ossEndpoint;
+ }
+
+ public String getOssBucket() {
+ return ossBucket;
+ }
+
+ public void setOssBucket(String ossBucket) {
+ this.ossBucket = ossBucket;
+ }
+
+ public String getOssAccessKey() {
+ return ossAccessKey;
+ }
+
+ public void setOssAccessKey(String ossAccessKey) {
+ this.ossAccessKey = ossAccessKey;
+ }
+
+ public String getOssSecretKey() {
+ return ossSecretKey;
+ }
+
+ public void setOssSecretKey(String ossSecretKey) {
+ this.ossSecretKey = ossSecretKey;
+ }
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
index 20cd32d9f..ca2f4f81f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
@@ -233,6 +233,9 @@ public class TieredContainerManager {
public void destroyContainer(MessageQueue mq) {
TieredMessageQueueContainer container =
messageQueueContainerMap.remove(mq);
if (container != null) {
+ MessageQueue messageQueue = container.getMessageQueue();
+ logger.info("BlobContainerManager#destroyContainer: try to destroy
container: topic: {}, queueId: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId());
container.destroy();
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 0b0dfd63a..2e07738bc 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -77,6 +77,7 @@ import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant
public class TieredStoreMetricsManager {
private static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+ private static String storageMedium = STORAGE_MEDIUM_BLOB;
public static LongHistogram apiLatency = new NopLongHistogram();
@@ -113,7 +114,7 @@ public class TieredStoreMetricsManager {
.build();
View rpcLatencyView = View.builder()
-
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 10d,
100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000)))
+
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d,
7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000)))
.setDescription("tiered_store_rpc_latency_view")
.build();
@@ -139,6 +140,10 @@ public class TieredStoreMetricsManager {
return res;
}
+ public static void setStorageMedium(String storageMedium) {
+ TieredStoreMetricsManager.storageMedium = storageMedium;
+ }
+
public static void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
TieredMessageStoreConfig storeConfig, TieredMessageFetcher fetcher,
MessageStore next) {
TieredStoreMetricsManager.attributesBuilderSupplier =
attributesBuilderSupplier;
@@ -318,6 +323,6 @@ public class TieredStoreMetricsManager {
public static AttributesBuilder newAttributesBuilder() {
AttributesBuilder builder = attributesBuilderSupplier != null ?
attributesBuilderSupplier.get() : Attributes.builder();
return builder.put(LABEL_STORAGE_TYPE, "tiered")
- .put(LABEL_STORAGE_MEDIUM, STORAGE_MEDIUM_BLOB);
+ .put(LABEL_STORAGE_MEDIUM, storageMedium);
}
}