This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb99334754f Pipe: implement RenameDatabaseProcessor &
WriteBackConnector for table model events (#14131)
bb99334754f is described below
commit bb99334754f4e5ac535c178417161ca6177c13d5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 26 21:40:48 2024 +0800
Pipe: implement RenameDatabaseProcessor & WriteBackConnector for table
model events (#14131)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.github/workflows/pipe-it-2cluster.yml | 18 ++++
.../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java | 59 ++++++++++
.../PipeDataRegionProcessorConstructor.java | 4 +
.../protocol/writeback/WriteBackConnector.java | 120 ++++++++++-----------
.../db/pipe/event/common/PipeInsertionEvent.java | 4 +
.../schemachange/RenameDatabaseProcessor.java | 101 +++++++++++++++++
.../agent/plugin/builtin/BuiltinPipePlugin.java | 3 +
.../schemachange/RenameDatabaseProcessor.java | 28 +++++
.../config/constant/PipeProcessorConstant.java | 2 +
9 files changed, 279 insertions(+), 60 deletions(-)
diff --git a/.github/workflows/pipe-it-2cluster.yml
b/.github/workflows/pipe-it-2cluster.yml
index 7c2143d6d2c..2ef400d1855 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -49,6 +49,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
@@ -108,6 +111,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
@@ -146,6 +152,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
@@ -184,6 +193,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
@@ -222,6 +234,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
@@ -259,6 +274,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
+ - name: Sleep for a random duration between 0 and 10000 milliseconds
+ run: |
+ sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in
client.yml
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
index 5c936041743..c12c75d4b52 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
@@ -216,4 +216,63 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeTableModelTestIT {
TableModelUtils.assertCountData("test", "test", 150, receiverEnv);
}
}
+
+ @Test
+ public void testWriteBackSink() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+ TableModelUtils.insertData("test", "test", 0, 50, senderEnv, true);
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)",
"flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("capture.table", "true");
+ extractorAttributes.put("capture.tree", "true");
+ extractorAttributes.put("forwarding-pipe-requests", "false");
+
+ processorAttributes.put("processor", "rename-database-processor");
+ processorAttributes.put("processor.new-db-name", "test1");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "true");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.realtime-first", "false");
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ TableModelUtils.insertData("test", "test", 50, 100, senderEnv, true);
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
+ return;
+ }
+
+ TableModelUtils.assertCountData("test1", "test", 100, receiverEnv);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 44c6ef17800..31cc8250ebf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSam
import
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
+import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
import
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -68,5 +69,8 @@ class PipeDataRegionProcessorConstructor extends
PipeProcessorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
PipeConsensusProcessor::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
+ RenameDatabaseProcessor::new);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 7889b778d86..84a82bc052f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -20,45 +20,46 @@
package org.apache.iotdb.db.pipe.connector.protocol.writeback;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
-import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
-import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.ZoneId;
+import java.io.IOException;
import java.util.Objects;
public class WriteBackConnector implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(WriteBackConnector.class);
+ // Simulate the behavior of the client-to-server communication
+ // for correctly handling data insertion in IoTDBReceiverAgent#receive method
+ private static final Coordinator COORDINATOR = Coordinator.getInstance();
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private IClientSession session;
+
+ private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;
+
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
// Do nothing
@@ -68,7 +69,16 @@ public class WriteBackConnector implements PipeConnector {
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
- // Do nothing
+ final PipeRuntimeEnvironment environment =
configuration.getRuntimeEnvironment();
+ session =
+ new InternalClientSession(
+ String.format(
+ "%s_%s_%s_%s",
+ WriteBackConnector.class.getSimpleName(),
+ environment.getPipeName(),
+ environment.getCreationTime(),
+ environment.getRegionId()));
+ SESSION_MANAGER.registerSession(session);
}
@Override
@@ -101,16 +111,9 @@ public class WriteBackConnector implements PipeConnector {
}
}
- @Override
- public void transfer(final Event event) throws Exception {
- if (!(event instanceof PipeHeartbeatEvent || event instanceof
PipeTerminateEvent)) {
- LOGGER.warn("WriteBackConnector does not support transferring generic
event: {}.", event);
- }
- }
-
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException {
+ throws PipeException, WALPipeException, IOException {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
WriteBackConnector.class.getName())) {
@@ -126,29 +129,28 @@ public class WriteBackConnector implements PipeConnector {
private void doTransfer(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException {
- final TSStatus status;
-
+ throws PipeException, WALPipeException, IOException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
- if (Objects.isNull(insertNode)) {
- status =
- PipeDataNodeAgent.receiver()
- .thrift()
- .receive(
- PipeTransferTabletBinaryReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
- .getStatus();
- } else {
- final InsertBaseStatement statement =
-
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
- status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
- }
+ final String dataBaseName =
+ pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+ ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+ : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
+ final TSStatus status =
+ PipeDataNodeAgent.receiver()
+ .thrift()
+ .receive(
+ Objects.isNull(insertNode)
+ ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
dataBaseName)
+ : PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
+ insertNode, dataBaseName))
+ .getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
+ "Write back PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
pipeInsertNodeTabletInsertionEvent, status));
}
}
@@ -168,37 +170,35 @@ public class WriteBackConnector implements PipeConnector {
private void doTransfer(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException {
- final InsertBaseStatement statement =
- PipeTransferTabletRawReq.toTPipeTransferRawReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned())
- .constructStatement();
final TSStatus status =
- statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
-
+ PipeDataNodeAgent.receiver()
+ .thrift()
+ .receive(
+ PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned(),
+ pipeRawTabletInsertionEvent.isTableModelEvent()
+ ?
pipeRawTabletInsertionEvent.getTableModelDatabaseName()
+ : TREE_MODEL_DATABASE_NAME_IDENTIFIER))
+ .getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
- "Transfer PipeRawTabletInsertionEvent %s error, result status
%s",
+ "Write back PipeRawTabletInsertionEvent %s error, result status
%s",
pipeRawTabletInsertionEvent, status));
}
}
- private TSStatus executeStatement(final InsertBaseStatement statement) {
- return Coordinator.getInstance()
- .executeForTreeModel(
- new PipeEnrichedStatement(statement),
- SessionManager.getInstance().requestQueryId(),
- new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
- .status;
+ @Override
+ public void transfer(final Event event) throws Exception {
+ // Ignore the event except TabletInsertionEvent
}
@Override
public void close() throws Exception {
- // Do nothing
+ if (session != null) {
+ SESSION_MANAGER.closeSession(session,
COORDINATOR::cleanupQueryExecution);
+ }
+ SESSION_MANAGER.removeCurrSession();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index 8ecfae9255f..cb9182aff58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -103,4 +103,8 @@ public abstract class PipeInsertionEvent extends
EnrichedEvent {
: treeModelDatabaseName
: tableModelDatabaseName;
}
+
+ public void renameTableModelDatabase(final String tableModelDatabaseName) {
+ this.tableModelDatabaseName = tableModelDatabaseName;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
new file mode 100644
index 00000000000..1d611a440d1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.schemachange;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_RENAME_DATABASE_NEW_DB_NAME;
+import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+public class RenameDatabaseProcessor implements PipeProcessor {
+
+ // Currently this processor is only used for table model.
+ // For tree model events, this processor will simply ignore them
+ private String newDatabaseName;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ validator.validateRequiredAttribute(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);
+ newDatabaseName =
validator.getParameters().getString(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);
+ try {
+ TableConfigTaskVisitor.validateDatabaseName(newDatabaseName);
+ } catch (final Exception e) {
+ throw new PipeException(
+ String.format(
+ "The new database name %s is invalid, it should not contain
'%s', "
+ + "should match the pattern %s, and the length should not
exceed %d",
+ newDatabaseName,
+ PATH_SEPARATOR,
+ IoTDBConfig.STORAGE_GROUP_PATTERN,
+ MAX_DATABASE_NAME_LENGTH),
+ e);
+ }
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeProcessorRuntimeConfiguration configuration)
+ throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ renameDatabase(tabletInsertionEvent, eventCollector);
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
+ throws Exception {
+ renameDatabase(tsFileInsertionEvent, eventCollector);
+ }
+
+ @Override
+ public void process(Event event, EventCollector eventCollector) throws
Exception {
+ renameDatabase(event, eventCollector);
+ }
+
+ private void renameDatabase(final Event event, final EventCollector
eventCollector)
+ throws Exception {
+ // This processor is only used for table model insertion events
+ if (event instanceof PipeInsertionEvent && ((PipeInsertionEvent)
event).isTableModelEvent()) {
+ ((PipeInsertionEvent) event).renameTableModelDatabase(newDatabaseName);
+ }
+
+ eventCollector.collect(event);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Do nothing
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 67f3a0265af..fdbeb5bd74f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor;
+import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor;
@@ -72,6 +73,7 @@ public enum BuiltinPipePlugin {
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor",
StandardStatisticsProcessor.class),
TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor",
TumblingWindowingProcessor.class),
PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor",
PipeConsensusProcessor.class),
+ RENAME_DATABASE_PROCESSOR("rename-database-processor",
RenameDatabaseProcessor.class),
// connectors
DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -143,6 +145,7 @@ public enum BuiltinPipePlugin {
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
+ RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
// Connectors
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
new file mode 100644
index 00000000000..c5166c1d2b7
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * rename-database-processor. There is a real implementation in the server
module but cannot be
+ * imported here. The pipe agent in the server module will replace this class
with the real
+ * implementation when initializing the rename-database-processor.
+ */
+public class RenameDatabaseProcessor {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index f8aef880bd9..3874be8e817 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -80,6 +80,8 @@ public class PipeProcessorConstant {
public static final String _PROCESSOR_OUTPUT_SERIES_KEY =
"processor.output-series";
public static final String PROCESSOR_OUTPUT_SERIES_KEY =
"processor.output.series";
+ public static final String PROCESSOR_RENAME_DATABASE_NEW_DB_NAME =
"processor.new-db-name";
+
private PipeProcessorConstant() {
throw new IllegalStateException("Utility class");
}