This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c064a8308ac9c54c9dd361f34e83f77b64349fba Author: V_Galaxy <[email protected]> AuthorDate: Thu Jul 25 22:51:19 2024 +0800 Subscription: display the now value as the actual timestamp when showing the topic (#13020) (cherry picked from commit 95cd06a0a789dda02913f4de8aa5a7f1430828b8) --- .../iotdb/rpc/subscription/config/TopicConfig.java | 26 ++++++------------- .../subscription/topic/CreateTopicProcedure.java | 6 +---- .../config/executor/ClusterConfigTaskExecutor.java | 30 +++++++++++++++++----- .../commons/subscription/meta/topic/TopicMeta.java | 4 +-- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java index 21a4b4873c8..fc967835dd2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java @@ -88,26 +88,16 @@ public class TopicConfig extends PipeParameters { attributes.getOrDefault(TopicConstant.PATH_KEY, TopicConstant.PATH_DEFAULT_VALUE)); } - public Map<String, String> getAttributesWithTimeRange(final long creationTime) { + public Map<String, String> getAttributesWithTimeRange() { final Map<String, String> attributesWithTimeRange = new HashMap<>(); - // parse start time - final String startTime = - attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE)); - if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(startTime)) { - attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, String.valueOf(creationTime)); - } else { - attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, startTime); - } - - // parse end time - final String endTime = - attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE)); - if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(endTime)) { - attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, String.valueOf(creationTime)); - } else { - attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime); - } + // there should be no TopicConstant.NOW_TIME_VALUE here + attributesWithTimeRange.put( + TopicConstant.START_TIME_KEY, + attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE))); + attributesWithTimeRange.put( + TopicConstant.END_TIME_KEY, + attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE))); return attributesWithTimeRange; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index afdbfe244d7..ed3d59bd3d4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -20,9 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.topic; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; -import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -78,9 +76,7 @@ public class CreateTopicProcedure extends AbstractOperateSubscriptionProcedure { topicMeta = new TopicMeta( createTopicReq.getTopicName(), - CommonDateTimeUtils.convertMilliTimeWithPrecision( - System.currentTimeMillis(), - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + System.currentTimeMillis(), createTopicReq.getTopicAttributes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index e088fa0bf42..3a1a7395c8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -243,6 +243,7 @@ import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; @@ -2016,14 +2017,31 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final String topicName = createTopicStatement.getTopicName(); final Map<String, String> topicAttributes = createTopicStatement.getTopicAttributes(); + // Replace now value with current time (raw timestamp based on system timestamp precision) + final long currentTime = + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + topicAttributes.computeIfPresent( + TopicConstant.START_TIME_KEY, + (k, v) -> { + if (TopicConstant.NOW_TIME_VALUE.equals(v)) { + return String.valueOf(currentTime); + } + return v; + }); + topicAttributes.computeIfPresent( + TopicConstant.END_TIME_KEY, + (k, v) -> { + if (TopicConstant.NOW_TIME_VALUE.equals(v)) { + return String.valueOf(currentTime); + } + return v; + }); + // Validate topic config final TopicMeta temporaryTopicMeta = - new TopicMeta( - topicName, - CommonDateTimeUtils.convertMilliTimeWithPrecision( - System.currentTimeMillis(), - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), - topicAttributes); + new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes); try { PipeDataNodeAgent.plugin() .validateExtractor(temporaryTopicMeta.generateExtractorAttributes()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 14c48c85da3..e3bde8a0017 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -39,7 +39,7 @@ import java.util.Set; public class TopicMeta { private String topicName; - private long creationTime; // raw timestamp based on system timestamp precision + private long creationTime; // unit in ms private TopicConfig config; private Set<String> subscribedConsumerGroupIds; @@ -182,7 +182,7 @@ public class TopicMeta { // path extractorAttributes.putAll(config.getAttributesWithPathOrPattern()); // time - extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime)); + extractorAttributes.putAll(config.getAttributesWithTimeRange()); // realtime mode extractorAttributes.putAll(config.getAttributesWithRealtimeMode()); // source mode
