This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 95cd06a0a78 Subscription: display the now value as the actual
timestamp when showing the topic (#13020)
95cd06a0a78 is described below
commit 95cd06a0a789dda02913f4de8aa5a7f1430828b8
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Jul 25 22:51:19 2024 +0800
Subscription: display the now value as the actual timestamp when showing
the topic (#13020)
---
.../iotdb/rpc/subscription/config/TopicConfig.java | 26 ++++++-------------
.../subscription/topic/CreateTopicProcedure.java | 6 +----
.../config/executor/ClusterConfigTaskExecutor.java | 30 +++++++++++++++++-----
.../commons/subscription/meta/topic/TopicMeta.java | 4 +--
4 files changed, 35 insertions(+), 31 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 21a4b4873c8..fc967835dd2 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -88,26 +88,16 @@ public class TopicConfig extends PipeParameters {
attributes.getOrDefault(TopicConstant.PATH_KEY,
TopicConstant.PATH_DEFAULT_VALUE));
}
- public Map<String, String> getAttributesWithTimeRange(final long
creationTime) {
+ public Map<String, String> getAttributesWithTimeRange() {
final Map<String, String> attributesWithTimeRange = new HashMap<>();
- // parse start time
- final String startTime =
- attributes.getOrDefault(TopicConstant.START_TIME_KEY,
String.valueOf(Long.MIN_VALUE));
- if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(startTime)) {
- attributesWithTimeRange.put(TopicConstant.START_TIME_KEY,
String.valueOf(creationTime));
- } else {
- attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, startTime);
- }
-
- // parse end time
- final String endTime =
- attributes.getOrDefault(TopicConstant.END_TIME_KEY,
String.valueOf(Long.MAX_VALUE));
- if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(endTime)) {
- attributesWithTimeRange.put(TopicConstant.END_TIME_KEY,
String.valueOf(creationTime));
- } else {
- attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
- }
+ // there should be no TopicConstant.NOW_TIME_VALUE here
+ attributesWithTimeRange.put(
+ TopicConstant.START_TIME_KEY,
+ attributes.getOrDefault(TopicConstant.START_TIME_KEY,
String.valueOf(Long.MIN_VALUE)));
+ attributesWithTimeRange.put(
+ TopicConstant.END_TIME_KEY,
+ attributes.getOrDefault(TopicConstant.END_TIME_KEY,
String.valueOf(Long.MAX_VALUE)));
return attributesWithTimeRange;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index afdbfe244d7..ed3d59bd3d4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
-import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -78,9 +76,7 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
topicMeta =
new TopicMeta(
createTopicReq.getTopicName(),
- CommonDateTimeUtils.convertMilliTimeWithPrecision(
- System.currentTimeMillis(),
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ System.currentTimeMillis(),
createTopicReq.getTopicAttributes());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e655d1fc71d..8587fc606dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -222,6 +222,7 @@ import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.trigger.api.Trigger;
@@ -1991,14 +1992,31 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final String topicName = createTopicStatement.getTopicName();
final Map<String, String> topicAttributes =
createTopicStatement.getTopicAttributes();
+ // Replace now value with current time (raw timestamp based on system
timestamp precision)
+ final long currentTime =
+ CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ System.currentTimeMillis(),
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ topicAttributes.computeIfPresent(
+ TopicConstant.START_TIME_KEY,
+ (k, v) -> {
+ if (TopicConstant.NOW_TIME_VALUE.equals(v)) {
+ return String.valueOf(currentTime);
+ }
+ return v;
+ });
+ topicAttributes.computeIfPresent(
+ TopicConstant.END_TIME_KEY,
+ (k, v) -> {
+ if (TopicConstant.NOW_TIME_VALUE.equals(v)) {
+ return String.valueOf(currentTime);
+ }
+ return v;
+ });
+
// Validate topic config
final TopicMeta temporaryTopicMeta =
- new TopicMeta(
- topicName,
- CommonDateTimeUtils.convertMilliTimeWithPrecision(
- System.currentTimeMillis(),
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
- topicAttributes);
+ new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
try {
PipeDataNodeAgent.plugin()
.validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 14c48c85da3..e3bde8a0017 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -39,7 +39,7 @@ import java.util.Set;
public class TopicMeta {
private String topicName;
- private long creationTime; // raw timestamp based on system timestamp
precision
+ private long creationTime; // unit in ms
private TopicConfig config;
private Set<String> subscribedConsumerGroupIds;
@@ -182,7 +182,7 @@ public class TopicMeta {
// path
extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
// time
-
extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime));
+ extractorAttributes.putAll(config.getAttributesWithTimeRange());
// realtime mode
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
// source mode