This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 626840c9a61 Pipe: CreatePipe and AlterPipe support now function 
(#14014) (#14037)
626840c9a61 is described below

commit 626840c9a61b8d93b0b1dd9ab3f1acef3005a4f7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 12 12:56:12 2024 +0800

    Pipe: CreatePipe and AlterPipe support now function (#14014) (#14037)
---
 .../pipe/it/autocreate/PipeNowFunctionIT.java      | 281 +++++++++++++++++++++
 .../execution/config/sys/pipe/AlterPipeTask.java   |  39 +++
 .../execution/config/sys/pipe/CreatePipeTask.java  |  39 +++
 .../config/sys/pipe/PipeFunctionSupport.java       |  85 +++++++
 .../config/constant/PipeExtractorConstant.java     |   1 +
 5 files changed, 445 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
new file mode 100644
index 00000000000..16b9cb05df0
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class PipeNowFunctionIT extends AbstractPipeDualAutoIT {
+
+  @Test
+  public void testPipeNowFunction() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("source.start-time", "now");
+      extractorAttributes.put("source.end-time", "now");
+      extractorAttributes.put("source.history.start-time", "now");
+      extractorAttributes.put("source.history.end-time", "now");
+      extractorAttributes.put("source.history.enable", "true");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      extractorAttributes.clear();
+      extractorAttributes.put("start-time", "now");
+      extractorAttributes.put("end-time", "now");
+      extractorAttributes.put("history.start-time", "now");
+      extractorAttributes.put("history.end-time", "now");
+      extractorAttributes.put("history.enable", "true");
+
+      status =
+          client.createPipe(
+              new TCreatePipeReq("p2", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      extractorAttributes.clear();
+      extractorAttributes.put("extractor.start-time", "now");
+      extractorAttributes.put("extractor.end-time", "now");
+      extractorAttributes.put("extractor.history.start-time", "now");
+      extractorAttributes.put("extractor.history.end-time", "now");
+      extractorAttributes.put("history.enable", "true");
+
+      status =
+          client.createPipe(
+              new TCreatePipeReq("p3", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertTrue(
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertTrue(
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
+
+      extractorAttributes.clear();
+      extractorAttributes.put("extractor.start-time", "now");
+      extractorAttributes.put("extractor.end-time", "now");
+      extractorAttributes.put("extractor.history.start-time", "now");
+      extractorAttributes.put("extractor.history.end-time", "now");
+      client.alterPipe(
+          new TAlterPipeReq()
+              .setPipeName("p1")
+              .setExtractorAttributes(extractorAttributes)
+              .setIsReplaceAllExtractorAttributes(false)
+              .setProcessorAttributes(new HashMap<>())
+              .setIsReplaceAllProcessorAttributes(false)
+              .setConnectorAttributes(new HashMap<>())
+              .setIsReplaceAllConnectorAttributes(false));
+
+      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertTrue(
+          showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") && 
o.state.equals("RUNNING")));
+
+      extractorAttributes.clear();
+      extractorAttributes.put("start-time", "now");
+      extractorAttributes.put("end-time", "now");
+      extractorAttributes.put("history.start-time", "now");
+      extractorAttributes.put("history.end-time", "now");
+      client.alterPipe(
+          new TAlterPipeReq()
+              .setPipeName("p1")
+              .setExtractorAttributes(extractorAttributes)
+              .setIsReplaceAllExtractorAttributes(false)
+              .setProcessorAttributes(new HashMap<>())
+              .setIsReplaceAllProcessorAttributes(false)
+              .setConnectorAttributes(new HashMap<>())
+              .setIsReplaceAllConnectorAttributes(false));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.stopPipe("p1").getCode());
+
+      showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+      Assert.assertTrue(showPipeResult.stream().anyMatch((o) -> 
o.id.equals("p1")));
+    }
+  }
+
+  @Test
+  public void testTreeModeSQLSupportNowFunc() {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    final String p1 =
+        String.format(
+            "create pipe p1"
+                + " with extractor ("
+                + "'extractor.history.enable'='true',"
+                + "'source.start-time'='now',"
+                + "'source.end-time'='now',"
+                + "'source.history.start-time'='now',"
+                + "'source.history.end-time'='now')"
+                + " with connector ("
+                + "'connector'='iotdb-thrift-connector',"
+                + "'connector.ip'='%s',"
+                + "'connector.port'='%s',"
+                + "'connector.batch.enable'='false')",
+            receiverIp, receiverPort);
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(p1);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    final String p2 =
+        String.format(
+            "create pipe p2"
+                + " with extractor ("
+                + "'extractor.history.enable'='true',"
+                + "'start-time'='now',"
+                + "'end-time'='now',"
+                + "'history.start-time'='now',"
+                + "'history.end-time'='now')"
+                + " with connector ("
+                + "'connector'='iotdb-thrift-connector',"
+                + "'connector.ip'='%s',"
+                + "'connector.port'='%s',"
+                + "'connector.batch.enable'='false')",
+            receiverIp, receiverPort);
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(p2);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    final String p3 =
+        String.format(
+            "create pipe p3"
+                + " with extractor ("
+                + "'extractor.history.enable'='true',"
+                + "'extractor.start-time'='now',"
+                + "'extractor.end-time'='now',"
+                + "'extractor.history.start-time'='now',"
+                + "'extractor.history.end-time'='now')"
+                + " with connector ("
+                + "'connector'='iotdb-thrift-connector',"
+                + "'connector.ip'='%s',"
+                + "'connector.port'='%s',"
+                + "'connector.batch.enable'='false')",
+            receiverIp, receiverPort);
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(p3);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    String alterP3 =
+        "alter pipe p3"
+            + " modify extractor ("
+            + "'history.enable'='true',"
+            + "'start-time'='now',"
+            + "'end-time'='now',"
+            + "'history.start-time'='now',"
+            + "'history.end-time'='now')";
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(alterP3);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    alterP3 =
+        "alter pipe p3"
+            + " modify extractor ("
+            + "'extractor.history.enable'='true',"
+            + "'extractor.start-time'='now',"
+            + "'extractor.end-time'='now',"
+            + "'extractor.history.start-time'='now',"
+            + "'extractor.history.end-time'='now')";
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(alterP3);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    alterP3 =
+        "alter pipe p3"
+            + " modify source ("
+            + "'extractor.history.enable'='true',"
+            + "'source.start-time'='now',"
+            + "'source.end-time'='now',"
+            + "'source.history.start-time'='now',"
+            + "'source.history.end-time'='now')";
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(alterP3);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index 104501ae23f..c01c8fb1a35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
@@ -26,11 +29,15 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeSta
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.Map;
+
 public class AlterPipeTask implements IConfigTask {
 
   private final AlterPipeStatement alterPipeStatement;
 
   public AlterPipeTask(AlterPipeStatement alterPipeStatement) {
+    // support now() function
+    
applyNowFunctionToExtractorAttributes(alterPipeStatement.getExtractorAttributes());
     this.alterPipeStatement = alterPipeStatement;
   }
 
@@ -39,4 +46,36 @@ public class AlterPipeTask implements IConfigTask {
       throws InterruptedException {
     return configTaskExecutor.alterPipe(alterPipeStatement);
   }
+
+  private void applyNowFunctionToExtractorAttributes(final Map<String, String> 
attributes) {
+    final long currentTime =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            System.currentTimeMillis(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+    // support now() function
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_START_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_START_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_END_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_END_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
+        currentTime);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index f0213c7d4f7..1c6e2b98516 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
@@ -26,11 +29,15 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeSt
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.Map;
+
 public class CreatePipeTask implements IConfigTask {
 
   private final CreatePipeStatement createPipeStatement;
 
   public CreatePipeTask(CreatePipeStatement createPipeStatement) {
+    // support now() function
+    
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
     this.createPipeStatement = createPipeStatement;
   }
 
@@ -39,4 +46,36 @@ public class CreatePipeTask implements IConfigTask {
       throws InterruptedException {
     return configTaskExecutor.createPipe(createPipeStatement);
   }
+
+  private void applyNowFunctionToExtractorAttributes(final Map<String, String> 
attributes) {
+    final long currentTime =
+        CommonDateTimeUtils.convertMilliTimeWithPrecision(
+            System.currentTimeMillis(),
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+    // support now() function
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_START_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_START_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_END_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_END_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
+        currentTime);
+
+    PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+        attributes,
+        PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY,
+        PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
+        currentTime);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
new file mode 100644
index 00000000000..c5dbf3056cd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class PipeFunctionSupport {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeFunctionSupport.class);
+
+  public static void applyNowFunctionToExtractorAttributes(
+      final Map<String, String> extractorAttributes,
+      final String sourceKey,
+      final String extractorKey,
+      final long currentTime) {
+    final Pair<String, String> pair =
+        getExtractorAttributesKeyAndValue(extractorAttributes, sourceKey, 
extractorKey);
+
+    if (pair == null) {
+      return;
+    }
+    if (isNowFunction(pair.right)) {
+      extractorAttributes.replace(pair.left, String.valueOf(currentTime));
+    }
+  }
+
+  private static Pair<String, String> getExtractorAttributesKeyAndValue(
+      final Map<String, String> extractorAttributes,
+      final String sourceKey,
+      final String extractorKey) {
+    String key = sourceKey;
+    String value = extractorAttributes.get(key);
+    if (value != null) {
+      return new Pair<>(key, value);
+    }
+
+    // "source.".length() == 7
+    try {
+      key = sourceKey.substring(7);
+      value = extractorAttributes.get(key);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "The prefix of sourceKey is not 'source.'. Please check the 
parameters passed in: {}",
+          sourceKey,
+          e);
+    }
+    if (value != null) {
+      return new Pair<>(key, value);
+    }
+
+    key = extractorKey;
+    value = extractorAttributes.get(key);
+    if (value != null) {
+      return new Pair<>(key, value);
+    }
+    return null;
+  }
+
+  private static boolean isNowFunction(final String value) {
+    return PipeExtractorConstant.NOW_TIME_VALUE.equalsIgnoreCase(value.trim());
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 79086799fcd..7a9a3c13556 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -96,6 +96,7 @@ public class PipeExtractorConstant {
   public static final String SOURCE_START_TIME_KEY = "source.start-time";
   public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
   public static final String SOURCE_END_TIME_KEY = "source.end-time";
+  public static final String NOW_TIME_VALUE = "now";
 
   public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = 
"extractor.watermark-interval-ms";
   public static final String SOURCE_WATERMARK_INTERVAL_KEY = 
"source.watermark-interval-ms";

Reply via email to