This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bb99334754f Pipe: implement RenameDatabaseProcessor & 
WriteBackConnector for table model events (#14131)
bb99334754f is described below

commit bb99334754f4e5ac535c178417161ca6177c13d5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Nov 26 21:40:48 2024 +0800

    Pipe: implement RenameDatabaseProcessor & WriteBackConnector for table 
model events (#14131)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .github/workflows/pipe-it-2cluster.yml             |  18 ++++
 .../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java    |  59 ++++++++++
 .../PipeDataRegionProcessorConstructor.java        |   4 +
 .../protocol/writeback/WriteBackConnector.java     | 120 ++++++++++-----------
 .../db/pipe/event/common/PipeInsertionEvent.java   |   4 +
 .../schemachange/RenameDatabaseProcessor.java      | 101 +++++++++++++++++
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |   3 +
 .../schemachange/RenameDatabaseProcessor.java      |  28 +++++
 .../config/constant/PipeProcessorConstant.java     |   2 +
 9 files changed, 279 insertions(+), 60 deletions(-)

diff --git a/.github/workflows/pipe-it-2cluster.yml 
b/.github/workflows/pipe-it-2cluster.yml
index 7c2143d6d2c..2ef400d1855 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -49,6 +49,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
@@ -108,6 +111,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
@@ -146,6 +152,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
@@ -184,6 +193,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
@@ -222,6 +234,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
@@ -259,6 +274,9 @@ jobs:
         with:
           distribution: liberica
           java-version: ${{ matrix.java }}
+      - name: Sleep for a random duration between 0 and 10000 milliseconds
+        run: |
+          sleep  $(( $(( RANDOM % 10000 + 1 )) / 1000))
       - name: IT Test
         shell: bash
         # we do not compile client-cpp for saving time, it is tested in 
client.yml
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
index 5c936041743..c12c75d4b52 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeDataSinkIT.java
@@ -216,4 +216,63 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeTableModelTestIT {
       TableModelUtils.assertCountData("test", "test", 150, receiverEnv);
     }
   }
+
+  @Test
+  public void testWriteBackSink() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+      TableModelUtils.insertData("test", "test", 0, 50, senderEnv, true);
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", 
"flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("capture.table", "true");
+      extractorAttributes.put("capture.tree", "true");
+      extractorAttributes.put("forwarding-pipe-requests", "false");
+
+      processorAttributes.put("processor", "rename-database-processor");
+      processorAttributes.put("processor.new-db-name", "test1");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "true");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.realtime-first", "false");
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      TableModelUtils.insertData("test", "test", 50, 100, senderEnv, true);
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", 
"flush"))) {
+        return;
+      }
+
+      TableModelUtils.assertCountData("test1", "test", 100, receiverEnv);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 44c6ef17800..31cc8250ebf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSam
 import 
org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
 import 
org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
+import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
 import 
org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
 
 class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
@@ -68,5 +69,8 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
         PipeConsensusProcessor::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
+        RenameDatabaseProcessor::new);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 7889b778d86..84a82bc052f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -20,45 +20,46 @@
 package org.apache.iotdb.db.pipe.connector.protocol.writeback;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
-import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
-import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.ZoneId;
+import java.io.IOException;
 import java.util.Objects;
 
 public class WriteBackConnector implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WriteBackConnector.class);
 
+  // Simulate the behavior of the client-to-server communication
+  // for correctly handling data insertion in IoTDBReceiverAgent#receive method
+  private static final Coordinator COORDINATOR = Coordinator.getInstance();
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private IClientSession session;
+
+  private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;
+
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     // Do nothing
@@ -68,7 +69,16 @@ public class WriteBackConnector implements PipeConnector {
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
-    // Do nothing
+    final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
+    session =
+        new InternalClientSession(
+            String.format(
+                "%s_%s_%s_%s",
+                WriteBackConnector.class.getSimpleName(),
+                environment.getPipeName(),
+                environment.getCreationTime(),
+                environment.getRegionId()));
+    SESSION_MANAGER.registerSession(session);
   }
 
   @Override
@@ -101,16 +111,9 @@ public class WriteBackConnector implements PipeConnector {
     }
   }
 
-  @Override
-  public void transfer(final Event event) throws Exception {
-    if (!(event instanceof PipeHeartbeatEvent || event instanceof 
PipeTerminateEvent)) {
-      LOGGER.warn("WriteBackConnector does not support transferring generic 
event: {}.", event);
-    }
-  }
-
   private void doTransferWrapper(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException {
+      throws PipeException, WALPipeException, IOException {
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
         WriteBackConnector.class.getName())) {
@@ -126,29 +129,28 @@ public class WriteBackConnector implements PipeConnector {
 
   private void doTransfer(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException {
-    final TSStatus status;
-
+      throws PipeException, WALPipeException, IOException {
     final InsertNode insertNode =
         pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-    if (Objects.isNull(insertNode)) {
-      status =
-          PipeDataNodeAgent.receiver()
-              .thrift()
-              .receive(
-                  PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
-              .getStatus();
-    } else {
-      final InsertBaseStatement statement =
-          
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
-      status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : 
executeStatement(statement);
-    }
+    final String dataBaseName =
+        pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
+            ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
+            : TREE_MODEL_DATABASE_NAME_IDENTIFIER;
 
+    final TSStatus status =
+        PipeDataNodeAgent.receiver()
+            .thrift()
+            .receive(
+                Objects.isNull(insertNode)
+                    ? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
+                        pipeInsertNodeTabletInsertionEvent.getByteBuffer(), 
dataBaseName)
+                    : PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
+                        insertNode, dataBaseName))
+            .getStatus();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
+              "Write back PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
               pipeInsertNodeTabletInsertionEvent, status));
     }
   }
@@ -168,37 +170,35 @@ public class WriteBackConnector implements PipeConnector {
 
   private void doTransfer(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException {
-    final InsertBaseStatement statement =
-        PipeTransferTabletRawReq.toTPipeTransferRawReq(
-                pipeRawTabletInsertionEvent.convertToTablet(),
-                pipeRawTabletInsertionEvent.isAligned())
-            .constructStatement();
     final TSStatus status =
-        statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : 
executeStatement(statement);
-
+        PipeDataNodeAgent.receiver()
+            .thrift()
+            .receive(
+                PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
+                    pipeRawTabletInsertionEvent.convertToTablet(),
+                    pipeRawTabletInsertionEvent.isAligned(),
+                    pipeRawTabletInsertionEvent.isTableModelEvent()
+                        ? 
pipeRawTabletInsertionEvent.getTableModelDatabaseName()
+                        : TREE_MODEL_DATABASE_NAME_IDENTIFIER))
+            .getStatus();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer PipeRawTabletInsertionEvent %s error, result status 
%s",
+              "Write back PipeRawTabletInsertionEvent %s error, result status 
%s",
               pipeRawTabletInsertionEvent, status));
     }
   }
 
-  private TSStatus executeStatement(final InsertBaseStatement statement) {
-    return Coordinator.getInstance()
-        .executeForTreeModel(
-            new PipeEnrichedStatement(statement),
-            SessionManager.getInstance().requestQueryId(),
-            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
-            "",
-            ClusterPartitionFetcher.getInstance(),
-            ClusterSchemaFetcher.getInstance(),
-            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
-        .status;
+  @Override
+  public void transfer(final Event event) throws Exception {
+    // Ignore the event except TabletInsertionEvent
   }
 
   @Override
   public void close() throws Exception {
-    // Do nothing
+    if (session != null) {
+      SESSION_MANAGER.closeSession(session, 
COORDINATOR::cleanupQueryExecution);
+    }
+    SESSION_MANAGER.removeCurrSession();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index 8ecfae9255f..cb9182aff58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -103,4 +103,8 @@ public abstract class PipeInsertionEvent extends 
EnrichedEvent {
                 : treeModelDatabaseName
         : tableModelDatabaseName;
   }
+
+  public void renameTableModelDatabase(final String tableModelDatabaseName) {
+    this.tableModelDatabaseName = tableModelDatabaseName;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
new file mode 100644
index 00000000000..1d611a440d1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/schemachange/RenameDatabaseProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.schemachange;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_RENAME_DATABASE_NEW_DB_NAME;
+import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+public class RenameDatabaseProcessor implements PipeProcessor {
+
+  // Currently this processor is only used for table model.
+  // For tree model events, this processor will simply ignore them
+  private String newDatabaseName;
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    validator.validateRequiredAttribute(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);
+    newDatabaseName = 
validator.getParameters().getString(PROCESSOR_RENAME_DATABASE_NEW_DB_NAME);
+    try {
+      TableConfigTaskVisitor.validateDatabaseName(newDatabaseName);
+    } catch (final Exception e) {
+      throw new PipeException(
+          String.format(
+              "The new database name %s is invalid, it should not contain 
'%s', "
+                  + "should match the pattern %s, and the length should not 
exceed %d",
+              newDatabaseName,
+              PATH_SEPARATOR,
+              IoTDBConfig.STORAGE_GROUP_PATTERN,
+              MAX_DATABASE_NAME_LENGTH),
+          e);
+    }
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeProcessorRuntimeConfiguration configuration)
+      throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    renameDatabase(tabletInsertionEvent, eventCollector);
+  }
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
+      throws Exception {
+    renameDatabase(tsFileInsertionEvent, eventCollector);
+  }
+
+  @Override
+  public void process(Event event, EventCollector eventCollector) throws 
Exception {
+    renameDatabase(event, eventCollector);
+  }
+
+  private void renameDatabase(final Event event, final EventCollector 
eventCollector)
+      throws Exception {
+    // This processor is only used for table model insertion events
+    if (event instanceof PipeInsertionEvent && ((PipeInsertionEvent) 
event).isTableModelEvent()) {
+      ((PipeInsertionEvent) event).renameTableModelDatabase(newDatabaseName);
+    }
+
+    eventCollector.collect(event);
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Do nothing
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 67f3a0265af..fdbeb5bd74f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor;
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange.RenameDatabaseProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor;
 
@@ -72,6 +73,7 @@ public enum BuiltinPipePlugin {
   STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", 
StandardStatisticsProcessor.class),
   TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", 
TumblingWindowingProcessor.class),
   PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", 
PipeConsensusProcessor.class),
+  RENAME_DATABASE_PROCESSOR("rename-database-processor", 
RenameDatabaseProcessor.class),
 
   // connectors
   DO_NOTHING_CONNECTOR("do-nothing-connector", DoNothingConnector.class),
@@ -143,6 +145,7 @@ public enum BuiltinPipePlugin {
                   
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
                   
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
                   PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
+                  RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
                   // Connectors
                   DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
                   IOTDB_THRIFT_CONNECTOR.getPipePluginName().toUpperCase(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
new file mode 100644
index 00000000000..c5166c1d2b7
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/schemachange/RenameDatabaseProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.schemachange;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents the
+ * rename-database-processor. There is a real implementation in the server 
module but cannot be
+ * imported here. The pipe agent in the server module will replace this class 
with the real
+ * implementation when initializing the rename-database-processor.
+ */
+public class RenameDatabaseProcessor {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
index f8aef880bd9..3874be8e817 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java
@@ -80,6 +80,8 @@ public class PipeProcessorConstant {
   public static final String _PROCESSOR_OUTPUT_SERIES_KEY = 
"processor.output-series";
   public static final String PROCESSOR_OUTPUT_SERIES_KEY = 
"processor.output.series";
 
+  public static final String PROCESSOR_RENAME_DATABASE_NEW_DB_NAME = 
"processor.new-db-name";
+
   private PipeProcessorConstant() {
     throw new IllegalStateException("Utility class");
   }

Reply via email to