This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4fa35d57cd0 Pipe: Implement down-sampling-processor plugin for
reducing the amount of data transferred during data sync (#11557)
4fa35d57cd0 is described below
commit 4fa35d57cd04d5198e48d39e8790bf1c9fab92c9
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 28 18:11:06 2023 +0800
Pipe: Implement down-sampling-processor plugin for reducing the amount of
data transferred during data sync (#11557)
This commit introduces down-sampling-processor plugin, it guarantees this
function:
1. Memory permitting, a tablet row and can only be transferred iff there
exists measurement with timeStamp greater than last timeStamp recorded with the
specified time interval.
2. A file won't be parsed unless down-sampling.split-file is configured to
true.
The plugin's parameters as follow:
| key | value | value range | required or optional with default
|
| -------- | -------- | -------- | -------- |
| processor | down-sampling-processor | down-sampling-processor | required
|
| processor.down-sampling.interval-seconds | Sampling interval of specifed
tablets | long | optional : 60 |
| processor.down-sampling.split-file | Whether to split the file during
processing | boolean | optional : false |
| processor.down-sampling.memory-limit-in-bytes | Initial memory for the
down-sampling-processor cache | long | optional : 16 * 1024 * 1024 |
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 12 +-
.../iotdb/pipe/PipeEnvironmentException.java | 12 +-
.../apache/iotdb/pipe/it/AbstractPipeDualIT.java | 60 ++++++
.../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java | 65 +++----
.../pipe/it/IoTDBPipeConnectorParallelIT.java | 28 +--
.../apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java | 29 +--
.../apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java | 28 +--
.../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java | 29 +--
...orParallelIT.java => IoTDBPipeProcessorIT.java} | 68 +++----
.../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java | 27 ++-
.../iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java | 16 +-
.../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java | 29 +--
.../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java | 29 +--
.../java/org/apache/iotdb/pipe/api/access/Row.java | 10 +-
.../agent/plugin/PipeProcessorConstructor.java | 3 +
.../config/constant/PipeProcessorConstant.java | 12 ++
.../env/PipeTaskConnectorRuntimeEnvironment.java | 9 +-
.../env/PipeTaskExtractorRuntimeEnvironment.java | 9 +-
...va => PipeTaskProcessorRuntimeEnvironment.java} | 13 +-
.../plugin/env/PipeTaskRuntimeEnvironment.java | 8 +-
.../iotdb/db/pipe/event/common/row/PipeRow.java | 5 +
.../downsampling/DownSamplingProcessor.java | 213 +++++++++++++++++++++
.../downsampling/PartialPathLastTimeCache.java | 111 +++++++++++
.../db/pipe/resource/memory/PipeMemoryBlock.java | 14 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 6 +-
.../dataregion/wal/utils/WALInsertNodeCache.java | 8 +-
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 47 +++--
.../builtin/processor/DownSamplingProcessor.java} | 17 +-
.../builtin/processor/PlaceHolderProcessor.java | 68 +++++++
.../org/apache/iotdb/tsfile/utils/BytesUtils.java | 10 +
30 files changed, 653 insertions(+), 342 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index f7aac47b65f..3fa35d05f25 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -414,11 +414,13 @@ public class TestUtils {
// This method will not throw failure given that a failure is encountered.
// Instead, it return a flag to indicate the result of the execution.
public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env,
List<String> sqlList) {
+ int lastIndex = 0;
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
- for (String sql : sqlList) {
- statement.execute(sql);
+ for (int i = lastIndex; i < sqlList.size(); ++i) {
+ statement.execute(sqlList.get(i));
+ lastIndex = i;
}
return true;
} catch (SQLException e) {
@@ -483,11 +485,13 @@ public class TestUtils {
public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
BaseEnv env, DataNodeWrapper wrapper, List<String> sqlList) {
+ int lastIndex = 0;
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
try (Connection connection =
env.getConnectionWithSpecifiedDataNode(wrapper);
Statement statement = connection.createStatement()) {
- for (String sql : sqlList) {
- statement.execute(sql);
+ for (int i = lastIndex; i < sqlList.size(); ++i) {
+ statement.execute(sqlList.get(i));
+ lastIndex = i;
}
return true;
} catch (SQLException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
similarity index 75%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
copy to
integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
index 8caa87c03ec..5788e89167a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/PipeEnvironmentException.java
@@ -17,13 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config.constant;
+package org.apache.iotdb.pipe;
-public class PipeProcessorConstant {
+public class PipeEnvironmentException extends RuntimeException {
- public static final String PROCESSOR_KEY = "processor";
+ public PipeEnvironmentException(String message) {
+ super(message);
+ }
- private PipeProcessorConstant() {
- throw new IllegalStateException("Utility class");
+ public PipeEnvironmentException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
new file mode 100644
index 00000000000..62d87ac9f1f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/AbstractPipeDualIT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it;
+
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
+
+import org.junit.After;
+import org.junit.Before;
+
+abstract class AbstractPipeDualIT {
+
+ protected BaseEnv senderEnv;
+ protected BaseEnv receiverEnv;
+
+ @Before
+ public void setUp() throws PipeEnvironmentException {
+ try {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ } catch (Exception | Error e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
+ }
+
+ @After
+ public void tearDown() throws PipeEnvironmentException {
+ try {
+ senderEnv.cleanClusterEnvironment();
+ receiverEnv.cleanClusterEnvironment();
+ } catch (Exception | Error e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 8ead12976c2..91a4c13d1f9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -35,11 +35,10 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -61,41 +60,37 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeClusterIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
+public class IoTDBPipeClusterIT extends AbstractPipeDualIT {
+ @Override
@Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv
- .getConfig()
- .getCommonConfig()
- .setAutoCreateSchemaEnabled(true)
- .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
- receiverEnv
- .getConfig()
- .getCommonConfig()
- .setAutoCreateSchemaEnabled(true)
- .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
- senderEnv.initClusterEnvironment(3, 3, 180);
- receiverEnv.initClusterEnvironment(3, 3, 180);
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
+ public void setUp() throws PipeEnvironmentException {
+ try {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+
+ senderEnv.initClusterEnvironment(3, 3, 180);
+ receiverEnv.initClusterEnvironment(3, 3, 180);
+ } catch (Exception | Error e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index 271957d61f9..161cfbfd4da 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,29 +41,7 @@ import java.util.Set;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeConnectorParallelIT {
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeConnectorParallelIT extends AbstractPipeDualIT {
@Test
public void testIoTConnectorParallel() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 52499cd14d4..65eb2a20f28 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -44,30 +40,7 @@ import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeDataSinkIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeDataSinkIT extends AbstractPipeDualIT {
@Test
public void testThriftConnector() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index a8f0828bfa9..7d183548e65 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -25,16 +25,13 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -53,30 +50,7 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeExtractorIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeExtractorIT extends AbstractPipeDualIT {
@Test
public void testExtractorValidParameter() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index 2cd8acc60c2..8226d25969d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -47,30 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeLifeCycleIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeLifeCycleIT extends AbstractPipeDualIT {
@Test
public void testLifeCycleWithHistoryEnabled() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
similarity index 62%
copy from
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
copy to
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
index 271957d61f9..42ffd2e8d74 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProcessorIT.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,48 +41,30 @@ import java.util.Set;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeConnectorParallelIT {
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeProcessorIT extends AbstractPipeDualIT {
@Test
- public void testIoTConnectorParallel() throws Exception {
+ public void testDownSamplingProcessor() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
String receiverIp = receiverDataNode.getIp();
int receiverPort = receiverDataNode.getPort();
- Set<String> expectedResSet = new HashSet<>();
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
- connectorAttributes.put("connector", "iotdb-thrift-connector");
- connectorAttributes.put("connector.batch.enable", "false");
- connectorAttributes.put("connector.ip", receiverIp);
- connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.parallel.tasks", "3");
+ extractorAttributes.put("source.realtime.mode", "log");
+
+ processorAttributes.put("processor", "down-sampling-processor");
+ processorAttributes.put("processor.down-sampling.interval-seconds",
"20");
+ processorAttributes.put("processor.down-sampling.split-file", "true");
+
+ connectorAttributes.put("sink", "iotdb-thrift-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
TSStatus status =
client.createPipe(
@@ -99,22 +77,28 @@ public class IoTDBPipeConnectorParallelIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
- "insert into root.sg1.d1(time, s1) values (0, 1)",
- "insert into root.sg1.d1(time, s1) values (1, 2)",
- "insert into root.sg1.d1(time, s1) values (2, 3)",
- "insert into root.sg1.d1(time, s1) values (3, 4)"))) {
+ "insert into root.vehicle.d0(time, s1) values (0, 1)",
+ "insert into root.vehicle.d0(time, s1) values (10000, 2)",
+ "insert into root.vehicle.d0(time, s1) values (19999, 3)",
+ "insert into root.vehicle.d0(time, s1) values (20000, 4)",
+ "insert into root.vehicle.d0(time, s1) values (20001, 5)",
+ "insert into root.vehicle.d0(time, s1) values (45000, 6)"))) {
return;
}
+ Set<String> expectedResSet = new HashSet<>();
+
expectedResSet.add("0,1.0,");
- expectedResSet.add("1,2.0,");
- expectedResSet.add("2,3.0,");
- expectedResSet.add("3,4.0,");
+ expectedResSet.add("20000,4.0,");
+ expectedResSet.add("45000,6.0,");
+
TestUtils.assertDataOnEnv(
- receiverEnv, "select * from root.**", "Time,root.sg1.d1.s1,",
expectedResSet);
+ receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,",
expectedResSet);
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index 6bcaee39b56..09df263e108 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -29,10 +29,9 @@ import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -47,22 +46,18 @@ import java.util.Map;
/** Test pipe's basic functionalities under multiple cluster and consensus
protocol settings. */
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeProtocolIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
+public class IoTDBPipeProtocolIT extends AbstractPipeDualIT {
+ @Override
@Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
+ public void setUp() throws PipeEnvironmentException {
+ try {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ } catch (Exception | Error e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
}
private void innerSetUp(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
index 4894a88a132..4a594730822 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSingleEnvDemoIT.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.PipeEnvironmentException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.After;
@@ -42,14 +43,21 @@ import java.util.Map;
public class IoTDBPipeSingleEnvDemoIT {
@Before
public void setUp() throws Exception {
- MultiEnvFactory.createEnv(1);
-
- MultiEnvFactory.getEnv(0).initClusterEnvironment(1, 1);
+ try {
+ MultiEnvFactory.createEnv(1);
+ MultiEnvFactory.getEnv(0).initClusterEnvironment(1, 1);
+ } catch (Exception e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
}
@After
public void tearDown() {
- MultiEnvFactory.getEnv(0).cleanClusterEnvironment();
+ try {
+ MultiEnvFactory.getEnv(0).cleanClusterEnvironment();
+ } catch (Exception e) {
+ throw new PipeEnvironmentException(e.getMessage(), e);
+ }
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index ea46ac87028..6c5838488ca 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -25,16 +25,12 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,30 +41,7 @@ import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeSwitchStatusIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeSwitchStatusIT extends AbstractPipeDualIT {
@Test
public void testPipeSwitchStatus() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
index d29b8d9ea50..58e248dc251 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
@@ -24,16 +24,12 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
-import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
-import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -50,30 +46,7 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
-public class IoTDBPipeSyntaxIT {
-
- private BaseEnv senderEnv;
- private BaseEnv receiverEnv;
-
- @Before
- public void setUp() throws Exception {
- MultiEnvFactory.createEnv(2);
- senderEnv = MultiEnvFactory.getEnv(0);
- receiverEnv = MultiEnvFactory.getEnv(1);
-
- senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
- }
-
- @After
- public void tearDown() {
- senderEnv.cleanClusterEnvironment();
- receiverEnv.cleanClusterEnvironment();
- }
-
+public class IoTDBPipeSyntaxIT extends AbstractPipeDualIT {
@Test
public void testValidPipeName() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
index 333502da43b..e98968b0fef 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
@@ -137,7 +137,7 @@ public interface Row {
int size();
/**
- * Returns the actual column index of the given column name.
+ * Returns the actual column index of the given column (measurement) name.
*
* @param columnName the column name in Path form
* @return the actual column index of the given column name
@@ -145,6 +145,14 @@ public interface Row {
*/
int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
+ /**
+ * Returns the actual column (measurement) name of the given column index.
+ *
+ * @param columnIndex the column index
+ * @return the actual column (measurement) name of the given column index
+ */
+ String getColumnName(int columnIndex);
+
/**
* Returns the column data types in the Row.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
index abd3e8669f9..68d32dc5529 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeProcessorConstructor.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -35,6 +36,8 @@ public class PipeProcessorConstructor extends
PipePluginConstructor {
protected void initConstructors() {
PLUGIN_CONSTRUCTORS.put(
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(),
DoNothingProcessor::new);
+ PLUGIN_CONSTRUCTORS.put(
+ BuiltinPipePlugin.DOWN_SAMPLING_PROCESSOR.getPipePluginName(),
DownSamplingProcessor::new);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
index 8caa87c03ec..7593fc74f1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
@@ -19,10 +19,22 @@
package org.apache.iotdb.db.pipe.config.constant;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
public class PipeProcessorConstant {
public static final String PROCESSOR_KEY = "processor";
+ public static final String PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY =
+ "processor.down-sampling.interval-seconds";
+ public static final long
PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE = 60;
+ public static final String PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY =
+ "processor.down-sampling.split-file";
+ public static final boolean PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE
= false;
+ public static final String PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY
=
+ "processor.down-sampling.memory-limit-in-bytes";
+ public static final long
PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE = 16 * MB;
+
private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
index a8fe2187750..207f0e3e0ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -21,14 +21,7 @@ package org.apache.iotdb.db.pipe.config.plugin.env;
public class PipeTaskConnectorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
- private final int regionId;
-
public PipeTaskConnectorRuntimeEnvironment(String pipeName, long
creationTime, int regionId) {
- super(pipeName, creationTime);
- this.regionId = regionId;
- }
-
- public int getRegionId() {
- return regionId;
+ super(pipeName, creationTime, regionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
index a726637d5b0..11ac1dfd0d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
@@ -23,21 +23,14 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
public class PipeTaskExtractorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
- private final int regionId;
-
private final PipeTaskMeta pipeTaskMeta;
public PipeTaskExtractorRuntimeEnvironment(
String pipeName, long creationTime, int regionId, PipeTaskMeta
pipeTaskMeta) {
- super(pipeName, creationTime);
- this.regionId = regionId;
+ super(pipeName, creationTime, regionId);
this.pipeTaskMeta = pipeTaskMeta;
}
- public int getRegionId() {
- return regionId;
- }
-
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
similarity index 76%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
index a8fe2187750..eb6656decc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java
@@ -19,16 +19,9 @@
package org.apache.iotdb.db.pipe.config.plugin.env;
-public class PipeTaskConnectorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
+public class PipeTaskProcessorRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
- private final int regionId;
-
- public PipeTaskConnectorRuntimeEnvironment(String pipeName, long
creationTime, int regionId) {
- super(pipeName, creationTime);
- this.regionId = regionId;
- }
-
- public int getRegionId() {
- return regionId;
+ public PipeTaskProcessorRuntimeEnvironment(String pipeName, long
creationTime, int regionId) {
+ super(pipeName, creationTime, regionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index a935dcf2dac..026d2605a95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -25,10 +25,12 @@ public class PipeTaskRuntimeEnvironment implements
PipeRuntimeEnvironment {
private final String pipeName;
private final long creationTime;
+ private final int regionId;
- public PipeTaskRuntimeEnvironment(String pipeName, long creationTime) {
+ protected PipeTaskRuntimeEnvironment(String pipeName, long creationTime, int
regionId) {
this.pipeName = pipeName;
this.creationTime = creationTime;
+ this.regionId = regionId;
}
@Override
@@ -40,4 +42,8 @@ public class PipeTaskRuntimeEnvironment implements
PipeRuntimeEnvironment {
public long getCreationTime() {
return creationTime;
}
+
+ public int getRegionId() {
+ return regionId;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index a4c34ae22bc..87f36db8191 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -159,6 +159,11 @@ public class PipeRow implements Row {
String.format("column %s not found", columnName.getFullPath()));
}
+ @Override
+ public String getColumnName(int columnIndex) {
+ return columnNameStringList[columnIndex];
+ }
+
@Override
public List<Type> getColumnTypes() {
return
PipeDataTypeTransformer.transformToPipeDataTypeList(Arrays.asList(valueColumnTypes));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
new file mode 100644
index 00000000000..59dbc21df9d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
+
+public class DownSamplingProcessor implements PipeProcessor {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DownSamplingProcessor.class);
+
+ private String dataBaseNameWithPathSeparator;
+ private long intervalInCurrentPrecision;
+ private boolean shouldSplitFile;
+
+ private PartialPathLastTimeCache partialPathLastTimeCache;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // No need to validate.
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ final String dataBaseName =
+ StorageEngine.getInstance()
+ .getDataRegion(
+ new DataRegionId(
+ ((PipeTaskProcessorRuntimeEnvironment)
configuration.getRuntimeEnvironment())
+ .getRegionId()))
+ .getDatabaseName();
+ final long intervalSeconds =
+ parameters.getLongOrDefault(
+ PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
+ PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_DEFAULT_VALUE);
+ final long memoryLimitInBytes =
+ parameters.getLongOrDefault(
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_DEFAULT_VALUE);
+ shouldSplitFile =
+ parameters.getBooleanOrDefault(
+ PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+ PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_DEFAULT_VALUE);
+ LOGGER.info(
+ "DownSamplingProcessor in {} is initialized with {}: {}s, {}: {}, {}:
{}.",
+ dataBaseName,
+ PROCESSOR_DOWN_SAMPLING_INTERVAL_SECONDS_KEY,
+ intervalSeconds,
+ PROCESSOR_DOWN_SAMPLING_MEMORY_LIMIT_IN_BYTES_KEY,
+ memoryLimitInBytes,
+ PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY,
+ shouldSplitFile);
+
+ dataBaseNameWithPathSeparator = dataBaseName +
TsFileConstant.PATH_SEPARATOR;
+ intervalInCurrentPrecision =
+ TimestampPrecisionUtils.convertToCurrPrecision(intervalSeconds,
TimeUnit.SECONDS);
+
+ partialPathLastTimeCache = new
PartialPathLastTimeCache(memoryLimitInBytes);
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ eventCollector.collect(tabletInsertionEvent);
+ return;
+ }
+
+ final AtomicReference<String> deviceSuffix = new AtomicReference<>();
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ tabletInsertionEvent
+ .processRowByRow(
+ (row, rowCollector) -> {
+ // To reduce the memory usage, we use the device suffix
+ // instead of the full path as the key.
+ if (deviceSuffix.get() == null) {
+
deviceSuffix.set(row.getDeviceId().replaceFirst(dataBaseNameWithPathSeparator,
""));
+ }
+
+ processRow(row, rowCollector, deviceSuffix.get(), exception);
+ })
+ .forEach(
+ event -> {
+ try {
+ eventCollector.collect(event);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ });
+
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ }
+
+ private void processRow(
+ Row row,
+ RowCollector rowCollector,
+ String deviceSuffix,
+ AtomicReference<Exception> exception) {
+ boolean hasNonNullMeasurements = false;
+
+ for (int index = 0, size = row.size(); index < size; ++index) {
+ if (row.isNull(index)) {
+ continue;
+ }
+
+ final String timeSeriesSuffix =
+ deviceSuffix + TsFileConstant.PATH_SEPARATOR +
row.getColumnName(index);
+ final Long lastSampleTime =
partialPathLastTimeCache.getPartialPathLastTime(timeSeriesSuffix);
+
+ if (lastSampleTime != null) {
+ if (Math.abs(row.getTime() - lastSampleTime) >=
intervalInCurrentPrecision) {
+ hasNonNullMeasurements = true;
+ partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix,
row.getTime());
+ }
+ } else {
+ hasNonNullMeasurements = true;
+ partialPathLastTimeCache.setPartialPathLastTime(timeSeriesSuffix,
row.getTime());
+ }
+ }
+
+ if (hasNonNullMeasurements) {
+ try {
+ rowCollector.collectRow(row);
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ }
+ }
+
+ /**
+ * If data comes in {@link TsFileInsertionEvent}, we will not split it into
{@link
+ * TabletInsertionEvent} by default, because the data in {@link
TsFileInsertionEvent} is already
+ * compressed, down-sampling will not reduce the size of data but will
increase the CPU usage.
+ */
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ if (shouldSplitFile) {
+ try {
+ for (final TabletInsertionEvent tabletInsertionEvent :
+ tsFileInsertionEvent.toTabletInsertionEvents()) {
+ process(tabletInsertionEvent, eventCollector);
+ }
+ } finally {
+ tsFileInsertionEvent.close();
+ }
+ } else {
+ eventCollector.collect(tsFileInsertionEvent);
+ }
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws
Exception {
+ eventCollector.collect(event);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (partialPathLastTimeCache != null) {
+ partialPathLastTimeCache.close();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
new file mode 100644
index 00000000000..226dbe361f5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastTimeCache.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling;
+
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.utils.MemUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Map-like component to look up for the last chosen time of a timeSeries. It
has max size and
+ * timeSeries may fail to find its last time and must design the logic to
handle this.
+ */
+public class PartialPathLastTimeCache implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartialPathLastTimeCache.class);
+
+ private final PipeMemoryBlock allocatedMemoryBlock;
+ // Used to adjust the memory usage of the cache
+ private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+
+ private final Cache<String, Long> partialPath2TimeCache;
+
+ public PartialPathLastTimeCache(long memoryLimitInBytes) {
+ allocatedMemoryBlock =
+ PipeResourceManager.memory()
+ .tryAllocate(memoryLimitInBytes)
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
+ .setShrinkCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() * ((double) oldMemory /
newMemory));
+ LOGGER.info(
+ "PartialPathLastTimeCache.allocatedMemoryBlock has
shrunk from {} to {}.",
+ oldMemory,
+ newMemory);
+ })
+ .setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
memoryLimitInBytes))
+ .setExpandCallback(
+ (oldMemory, newMemory) -> {
+ memoryUsageCheatFactor.set(
+ memoryUsageCheatFactor.get() / ((double) newMemory /
oldMemory));
+ LOGGER.info(
+ "PartialPathLastTimeCache.allocatedMemoryBlock has
expanded from {} to {}.",
+ oldMemory,
+ newMemory);
+ });
+
+ // Currently disable the metric here because it's not a constant cache and
the number may
+ // fluctuate. In the future all the "processorCache"s may be recorded in
single metric entry
+ partialPath2TimeCache =
+ Caffeine.newBuilder()
+ .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+ .weigher(
+ // Here partial path is a part of full path adequate to
inspect the last time
+ (Weigher<String, Long>)
+ (partialPath, timeStamp) -> {
+ final long weightInLong =
+ (long)
+ ((MemUtils.getStringMem(partialPath) +
Long.BYTES)
+ * memoryUsageCheatFactor.get());
+ if (weightInLong <= 0) {
+ return Integer.MAX_VALUE;
+ }
+ final int weightInInt = (int) weightInLong;
+ return weightInInt != weightInLong ? Integer.MAX_VALUE :
weightInInt;
+ })
+ .build();
+ }
+
+ /////////////////////////// Getter & Setter ///////////////////////////
+
+ public Long getPartialPathLastTime(String partialPath) {
+ return partialPath2TimeCache.getIfPresent(partialPath);
+ }
+
+ public void setPartialPathLastTime(String partialPath, long timeStamp) {
+ partialPath2TimeCache.put(partialPath, timeStamp);
+ }
+
+ /////////////////////////// Close ///////////////////////////
+
+ @Override
+ public void close() throws Exception {
+ partialPath2TimeCache.invalidateAll();
+ allocatedMemoryBlock.close();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 642683fb703..e0306eea7da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
-import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
public class PipeMemoryBlock implements AutoCloseable {
@@ -41,9 +41,9 @@ public class PipeMemoryBlock implements AutoCloseable {
private final AtomicLong memoryUsageInBytes = new AtomicLong(0);
- private final AtomicReference<Function<Long, Long>> shrinkMethod = new
AtomicReference<>();
+ private final AtomicReference<LongUnaryOperator> shrinkMethod = new
AtomicReference<>();
private final AtomicReference<BiConsumer<Long, Long>> shrinkCallback = new
AtomicReference<>();
- private final AtomicReference<Function<Long, Long>> expandMethod = new
AtomicReference<>();
+ private final AtomicReference<LongUnaryOperator> expandMethod = new
AtomicReference<>();
private final AtomicReference<BiConsumer<Long, Long>> expandCallback = new
AtomicReference<>();
private volatile boolean isReleased = false;
@@ -60,7 +60,7 @@ public class PipeMemoryBlock implements AutoCloseable {
this.memoryUsageInBytes.set(memoryUsageInBytes);
}
- public PipeMemoryBlock setShrinkMethod(Function<Long, Long> shrinkMethod) {
+ public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
this.shrinkMethod.set(shrinkMethod);
return this;
}
@@ -70,7 +70,7 @@ public class PipeMemoryBlock implements AutoCloseable {
return this;
}
- public PipeMemoryBlock setExpandMethod(Function<Long, Long> extendMethod) {
+ public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
this.expandMethod.set(extendMethod);
return this;
}
@@ -97,7 +97,7 @@ public class PipeMemoryBlock implements AutoCloseable {
}
final long oldMemorySizeInBytes = memoryUsageInBytes.get();
- final long newMemorySizeInBytes =
shrinkMethod.get().apply(memoryUsageInBytes.get());
+ final long newMemorySizeInBytes =
shrinkMethod.get().applyAsLong(memoryUsageInBytes.get());
final long memoryInBytesCanBeReleased = oldMemorySizeInBytes -
newMemorySizeInBytes;
if (memoryInBytesCanBeReleased <= 0
@@ -132,7 +132,7 @@ public class PipeMemoryBlock implements AutoCloseable {
}
final long oldMemorySizeInBytes = memoryUsageInBytes.get();
- final long newMemorySizeInBytes =
expandMethod.get().apply(memoryUsageInBytes.get());
+ final long newMemorySizeInBytes =
expandMethod.get().applyAsLong(memoryUsageInBytes.get());
final long memoryInBytesNeededToBeAllocated = newMemorySizeInBytes -
oldMemorySizeInBytes;
if (memoryInBytesNeededToBeAllocated <= 0
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 4bcf0e44c76..99de8bc7b00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.stage;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
@@ -70,7 +70,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// 2. customize processor
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
- new PipeTaskRuntimeConfiguration(new
PipeTaskRuntimeEnvironment(pipeName, creationTime));
+ new PipeTaskRuntimeConfiguration(
+ new PipeTaskProcessorRuntimeEnvironment(
+ pipeName, creationTime, dataRegionId.getId()));
pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
} catch (Exception e) {
throw new PipeException(e.getMessage(), e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 7cc10aadf25..a7cd9b8ac0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -59,11 +59,11 @@ public class WALInsertNodeCache {
private static final Logger LOGGER =
LoggerFactory.getLogger(WALInsertNodeCache.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final PipeMemoryBlock allocatedMemoryBlock;
// Used to adjust the memory usage of the cache
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
+ // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
// ids of all pinned memTables
@@ -80,7 +80,7 @@ public class WALInsertNodeCache {
allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
- .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 1))
+ .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
.setShrinkCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.set(
@@ -93,14 +93,14 @@ public class WALInsertNodeCache {
newMemory);
})
.setExpandMethod(
- (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2,
requestedAllocateSize))
+ oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
requestedAllocateSize))
.setExpandCallback(
(oldMemory, newMemory) -> {
memoryUsageCheatFactor.set(
memoryUsageCheatFactor.get() / ((double) newMemory /
oldMemory));
isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
LOGGER.info(
- "WALInsertNodeCache allocatedMemoryBlock of dataRegion
{} has expanded from {} to {}.",
+ "WALInsertNodeCache.allocatedMemoryBlock of dataRegion
{} has expanded from {} to {}.",
dataRegionId,
oldMemory,
newMemory);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 96c6bbae38f..f2063aa80e2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -30,7 +30,10 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WriteBackConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DownSamplingProcessor;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -43,6 +46,7 @@ public enum BuiltinPipePlugin {
// processors
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
+ DOWN_SAMPLING_PROCESSOR("down-sampling-processor",
DownSamplingProcessor.class),
// connectors
DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -88,24 +92,27 @@ public enum BuiltinPipePlugin {
return className;
}
- public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST = new
HashSet<>();
-
- static {
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_EXTRACTOR.getPipePluginName().toUpperCase());
-
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(OPC_UA_CONNECTOR.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase());
-
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase());
-
SHOW_PIPE_PLUGINS_BLACKLIST.add(WEBSOCKET_SINK.getPipePluginName().toUpperCase());
- }
+ public static final Set<String> SHOW_PIPE_PLUGINS_BLACKLIST =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ // Extractors
+ IOTDB_EXTRACTOR.getPipePluginName().toUpperCase(),
+ // Processors
+ DOWN_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
+ // Connectors
+ DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
+ IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
+
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
+
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
+ IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
+ WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
+ OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
+ WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
+ // Sinks
+ IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
+ IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
+ IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
+ WEBSOCKET_SINK.getPipePluginName().toUpperCase())));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
similarity index 63%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
index 8caa87c03ec..1658876fa10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DownSamplingProcessor.java
@@ -17,13 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.config.constant;
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
-public class PipeProcessorConstant {
-
- public static final String PROCESSOR_KEY = "processor";
-
- private PipeProcessorConstant() {
- throw new IllegalStateException("Utility class");
- }
-}
+/**
+ * This class is a placeholder and should not be initialized. It represents
the Down Sampling
+ * processor. There is a real implementation in the server module but cannot
be imported here. The
+ * pipe agent in the server module will replace this class with the real
implementation when
+ * initializing the Down Sampling processor.
+ */
+public class DownSamplingProcessor extends PlaceHolderProcessor {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
new file mode 100644
index 00000000000..431c6ece304
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/PlaceHolderProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents
the all the IoTDB pipe
+ * processors that can not be implemented in the node-commons module. Each
IoTDB pipe processor has
+ * a real implementation in the server module but cannot be imported here. The
pipe agent in the
+ * server module will replace this class with the real implementation when
initializing the IoTDB
+ * pipe processor.
+ */
+public class PlaceHolderProcessor implements PipeProcessor {
+
+ private static final String PLACEHOLDER_ERROR_MSG =
+ "This class is a placeholder and should not be used.";
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws
Exception {
+ throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+ }
+
+ @Override
+ public void close() throws Exception {
+ throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
+ }
+}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
index ac1f6190c2d..c4383231f90 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
@@ -678,6 +678,16 @@ public class BytesUtils {
return result;
}
+ /**
+ * Return the deepCopy of the given byte array.
+ *
+ * @param src input byte array
+ * @return byte array
+ */
+ public static byte[] deepCopy(byte[] src) {
+ return subBytes(src, 0, src.length);
+ }
+
/**
* cut out specified length byte array from parameter start from input byte
array src and return.
*