This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 626840c9a61 Pipe: CreatePipe and AlterPipe support now function
(#14014) (#14037)
626840c9a61 is described below
commit 626840c9a61b8d93b0b1dd9ab3f1acef3005a4f7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 12 12:56:12 2024 +0800
Pipe: CreatePipe and AlterPipe support now function (#14014) (#14037)
---
.../pipe/it/autocreate/PipeNowFunctionIT.java | 281 +++++++++++++++++++++
.../execution/config/sys/pipe/AlterPipeTask.java | 39 +++
.../execution/config/sys/pipe/CreatePipeTask.java | 39 +++
.../config/sys/pipe/PipeFunctionSupport.java | 85 +++++++
.../config/constant/PipeExtractorConstant.java | 1 +
5 files changed, 445 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
new file mode 100644
index 00000000000..16b9cb05df0
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipeNowFunctionIT.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class PipeNowFunctionIT extends AbstractPipeDualAutoIT {
+
+ @Test
+ public void testPipeNowFunction() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source.start-time", "now");
+ extractorAttributes.put("source.end-time", "now");
+ extractorAttributes.put("source.history.start-time", "now");
+ extractorAttributes.put("source.history.end-time", "now");
+ extractorAttributes.put("source.history.enable", "true");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ extractorAttributes.clear();
+ extractorAttributes.put("start-time", "now");
+ extractorAttributes.put("end-time", "now");
+ extractorAttributes.put("history.start-time", "now");
+ extractorAttributes.put("history.end-time", "now");
+ extractorAttributes.put("history.enable", "true");
+
+ status =
+ client.createPipe(
+ new TCreatePipeReq("p2", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ extractorAttributes.clear();
+ extractorAttributes.put("extractor.start-time", "now");
+ extractorAttributes.put("extractor.end-time", "now");
+ extractorAttributes.put("extractor.history.start-time", "now");
+ extractorAttributes.put("extractor.history.end-time", "now");
+ extractorAttributes.put("history.enable", "true");
+
+ status =
+ client.createPipe(
+ new TCreatePipeReq("p3", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertTrue(
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertTrue(
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
+
+ extractorAttributes.clear();
+ extractorAttributes.put("extractor.start-time", "now");
+ extractorAttributes.put("extractor.end-time", "now");
+ extractorAttributes.put("extractor.history.start-time", "now");
+ extractorAttributes.put("extractor.history.end-time", "now");
+ client.alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("p1")
+ .setExtractorAttributes(extractorAttributes)
+ .setIsReplaceAllExtractorAttributes(false)
+ .setProcessorAttributes(new HashMap<>())
+ .setIsReplaceAllProcessorAttributes(false)
+ .setConnectorAttributes(new HashMap<>())
+ .setIsReplaceAllConnectorAttributes(false));
+
+ showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertTrue(
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
+
+ extractorAttributes.clear();
+ extractorAttributes.put("start-time", "now");
+ extractorAttributes.put("end-time", "now");
+ extractorAttributes.put("history.start-time", "now");
+ extractorAttributes.put("history.end-time", "now");
+ client.alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("p1")
+ .setExtractorAttributes(extractorAttributes)
+ .setIsReplaceAllExtractorAttributes(false)
+ .setProcessorAttributes(new HashMap<>())
+ .setIsReplaceAllProcessorAttributes(false)
+ .setConnectorAttributes(new HashMap<>())
+ .setIsReplaceAllConnectorAttributes(false));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
+
+ showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertTrue(showPipeResult.stream().anyMatch((o) ->
o.id.equals("p1")));
+ }
+ }
+
+ @Test
+ public void testTreeModeSQLSupportNowFunc() {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ final String p1 =
+ String.format(
+ "create pipe p1"
+ + " with extractor ("
+ + "'extractor.history.enable'='true',"
+ + "'source.start-time'='now',"
+ + "'source.end-time'='now',"
+ + "'source.history.start-time'='now',"
+ + "'source.history.end-time'='now')"
+ + " with connector ("
+ + "'connector'='iotdb-thrift-connector',"
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.batch.enable'='false')",
+ receiverIp, receiverPort);
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(p1);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ final String p2 =
+ String.format(
+ "create pipe p2"
+ + " with extractor ("
+ + "'extractor.history.enable'='true',"
+ + "'start-time'='now',"
+ + "'end-time'='now',"
+ + "'history.start-time'='now',"
+ + "'history.end-time'='now')"
+ + " with connector ("
+ + "'connector'='iotdb-thrift-connector',"
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.batch.enable'='false')",
+ receiverIp, receiverPort);
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(p2);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ final String p3 =
+ String.format(
+ "create pipe p3"
+ + " with extractor ("
+ + "'extractor.history.enable'='true',"
+ + "'extractor.start-time'='now',"
+ + "'extractor.end-time'='now',"
+ + "'extractor.history.start-time'='now',"
+ + "'extractor.history.end-time'='now')"
+ + " with connector ("
+ + "'connector'='iotdb-thrift-connector',"
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.batch.enable'='false')",
+ receiverIp, receiverPort);
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(p3);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ String alterP3 =
+ "alter pipe p3"
+ + " modify extractor ("
+ + "'history.enable'='true',"
+ + "'start-time'='now',"
+ + "'end-time'='now',"
+ + "'history.start-time'='now',"
+ + "'history.end-time'='now')";
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(alterP3);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ alterP3 =
+ "alter pipe p3"
+ + " modify extractor ("
+ + "'extractor.history.enable'='true',"
+ + "'extractor.start-time'='now',"
+ + "'extractor.end-time'='now',"
+ + "'extractor.history.start-time'='now',"
+ + "'extractor.history.end-time'='now')";
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(alterP3);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ alterP3 =
+ "alter pipe p3"
+ + " modify source ("
+ + "'extractor.history.enable'='true',"
+ + "'source.start-time'='now',"
+ + "'source.end-time'='now',"
+ + "'source.history.start-time'='now',"
+ + "'source.history.end-time'='now')";
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(alterP3);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
index 104501ae23f..c01c8fb1a35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/AlterPipeTask.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
@@ -26,11 +29,15 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeSta
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Map;
+
public class AlterPipeTask implements IConfigTask {
private final AlterPipeStatement alterPipeStatement;
public AlterPipeTask(AlterPipeStatement alterPipeStatement) {
+ // support now() function
+
applyNowFunctionToExtractorAttributes(alterPipeStatement.getExtractorAttributes());
this.alterPipeStatement = alterPipeStatement;
}
@@ -39,4 +46,36 @@ public class AlterPipeTask implements IConfigTask {
throws InterruptedException {
return configTaskExecutor.alterPipe(alterPipeStatement);
}
+
+ private void applyNowFunctionToExtractorAttributes(final Map<String, String>
attributes) {
+ final long currentTime =
+ CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ System.currentTimeMillis(),
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+ // support now() function
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_START_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_START_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_END_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_END_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
+ currentTime);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index f0213c7d4f7..1c6e2b98516 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
@@ -26,11 +29,15 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeSt
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Map;
+
public class CreatePipeTask implements IConfigTask {
private final CreatePipeStatement createPipeStatement;
public CreatePipeTask(CreatePipeStatement createPipeStatement) {
+ // support now() function
+
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
this.createPipeStatement = createPipeStatement;
}
@@ -39,4 +46,36 @@ public class CreatePipeTask implements IConfigTask {
throws InterruptedException {
return configTaskExecutor.createPipe(createPipeStatement);
}
+
+ private void applyNowFunctionToExtractorAttributes(final Map<String, String>
attributes) {
+ final long currentTime =
+ CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ System.currentTimeMillis(),
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+ // support now() function
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_START_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_START_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_END_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_END_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY,
+ currentTime);
+
+ PipeFunctionSupport.applyNowFunctionToExtractorAttributes(
+ attributes,
+ PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY,
+ PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY,
+ currentTime);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
new file mode 100644
index 00000000000..c5dbf3056cd
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class PipeFunctionSupport {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeFunctionSupport.class);
+
+ public static void applyNowFunctionToExtractorAttributes(
+ final Map<String, String> extractorAttributes,
+ final String sourceKey,
+ final String extractorKey,
+ final long currentTime) {
+ final Pair<String, String> pair =
+ getExtractorAttributesKeyAndValue(extractorAttributes, sourceKey,
extractorKey);
+
+ if (pair == null) {
+ return;
+ }
+ if (isNowFunction(pair.right)) {
+ extractorAttributes.replace(pair.left, String.valueOf(currentTime));
+ }
+ }
+
+ private static Pair<String, String> getExtractorAttributesKeyAndValue(
+ final Map<String, String> extractorAttributes,
+ final String sourceKey,
+ final String extractorKey) {
+ String key = sourceKey;
+ String value = extractorAttributes.get(key);
+ if (value != null) {
+ return new Pair<>(key, value);
+ }
+
+ // "source.".length() == 7
+ try {
+ key = sourceKey.substring(7);
+ value = extractorAttributes.get(key);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "The prefix of sourceKey is not 'source.'. Please check the
parameters passed in: {}",
+ sourceKey,
+ e);
+ }
+ if (value != null) {
+ return new Pair<>(key, value);
+ }
+
+ key = extractorKey;
+ value = extractorAttributes.get(key);
+ if (value != null) {
+ return new Pair<>(key, value);
+ }
+ return null;
+ }
+
+ private static boolean isNowFunction(final String value) {
+ return PipeExtractorConstant.NOW_TIME_VALUE.equalsIgnoreCase(value.trim());
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 79086799fcd..7a9a3c13556 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -96,6 +96,7 @@ public class PipeExtractorConstant {
public static final String SOURCE_START_TIME_KEY = "source.start-time";
public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time";
public static final String SOURCE_END_TIME_KEY = "source.end-time";
+ public static final String NOW_TIME_VALUE = "now";
public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY =
"extractor.watermark-interval-ms";
public static final String SOURCE_WATERMARK_INTERVAL_KEY =
"source.watermark-interval-ms";