This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 935b9a8f00e Pipe: Fixed the config meta transfer problem (#16280)
935b9a8f00e is described below

commit 935b9a8f00ed739e31f71855326e4d81a29b474c
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 27 14:19:22 2025 +0800

    Pipe: Fixed the config meta transfer problem (#16280)
    
    * table-fix
    
    * refactor
    
    * fix
---
 .../response/pipe/task/PipeTableResp.java          | 11 ++--
 .../agent/runtime/PipeConfigNodeRuntimeAgent.java  | 11 +---
 .../pipe/agent/task/PipeConfigNodeSubtask.java     | 51 ++++++++---------
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   | 11 +---
 .../pipe/agent/task/PipeConfigNodeTaskBuilder.java |  6 +-
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     |  1 +
 .../pipe/source/IoTDBConfigRegionSource.java       |  2 +-
 .../confignode/persistence/pipe/PipeInfo.java      | 10 ++--
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 16 +++---
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  4 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  4 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  4 +-
 .../pipe/util/PipeExternalSourceLoadBalancer.java  |  2 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 19 +++----
 .../agent/task/builder/PipeDataNodeBuilder.java    |  2 +-
 .../task/builder/PipeDataNodeTaskBuilder.java      |  6 +-
 .../pipe/agent/task/stage/PipeTaskSourceStage.java |  9 ++-
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 66 +++++++++++-----------
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  6 +-
 ...istoricalDataRegionTsFileAndDeletionSource.java |  6 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |  6 +-
 .../db/pipe/source/mqtt/MQTTPublishHandler.java    |  4 +-
 .../iotdb/db/pipe/source/mqtt/MQTTSource.java      |  6 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 14 ++---
 .../subtask/SubscriptionSinkSubtaskManager.java    |  2 +-
 .../agent/plugin/PipeDataNodePluginAgentTest.java  |  4 +-
 .../db/pipe/consensus/DeletionResourceTest.java    |  4 +-
 .../pattern/CachedSchemaPatternMatcherTest.java    |  9 ++-
 .../db/pipe/source/PipeRealtimeExtractTest.java    | 10 ++--
 .../commons/pipe/agent/plugin/PipePluginAgent.java |  8 +--
 .../commons/pipe/agent/task/PipeTaskAgent.java     | 14 ++---
 .../pipe/agent/task/meta/PipeStaticMeta.java       | 58 +++++++++----------
 .../task/subtask/PipeAbstractSinkSubtask.java      | 16 +++---
 ....java => PipeTaskSourceRuntimeEnvironment.java} |  9 ++-
 .../iotdb/commons/pipe/source/IoTDBSource.java     |  6 +-
 35 files changed, 199 insertions(+), 218 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 37c3d1bd03b..ec596588b1e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -77,7 +77,7 @@ public class PipeTableResp implements DataSet {
           allPipeMeta.stream()
               .filter(pipeMeta -> 
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
               .findFirst()
-              .map(pipeMeta -> 
pipeMeta.getStaticMeta().getConnectorParameters().toString())
+              .map(pipeMeta -> 
pipeMeta.getStaticMeta().getSinkParameters().toString())
               .orElse(null);
 
       return new PipeTableResp(
@@ -87,7 +87,7 @@ public class PipeTableResp implements DataSet {
                   pipeMeta ->
                       pipeMeta
                           .getStaticMeta()
-                          .getConnectorParameters()
+                          .getSinkParameters()
                           .toString()
                           .equals(sortedConnectorParametersString))
               .collect(Collectors.toList()));
@@ -177,10 +177,9 @@ public class PipeTableResp implements DataSet {
               staticMeta.getPipeName(),
               staticMeta.getCreationTime(),
               runtimeMeta.getStatus().get().name(),
-              
SystemConstant.addSystemKeysIfNecessary(staticMeta.getExtractorParameters())
-                  .toString(),
+              
SystemConstant.addSystemKeysIfNecessary(staticMeta.getSourceParameters()).toString(),
               staticMeta.getProcessorParameters().toString(),
-              staticMeta.getConnectorParameters().toString(),
+              staticMeta.getSinkParameters().toString(),
               exceptionMessageBuilder.toString());
       final PipeTemporaryMetaInCoordinator temporaryMeta =
           (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta();
@@ -206,7 +205,7 @@ public class PipeTableResp implements DataSet {
                   .getRegisteredDataNodeCount()
               == 1
           && ConfigRegionListeningFilter.parseListeningPlanTypeSet(
-                  pipeMeta.getStaticMeta().getExtractorParameters())
+                  pipeMeta.getStaticMeta().getSourceParameters())
               .isEmpty();
     } catch (final IllegalPathException e) {
       return false;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index 372b0334a24..e8cc16a1edf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.confignode.manager.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
@@ -156,18 +155,12 @@ public class PipeConfigNodeRuntimeAgent implements 
IService {
 
     pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
 
-    // Stop all pipes locally if critical exception occurs
-    if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
-      PipeConfigNodeAgent.task().stopAllPipesWithCriticalException();
-    }
+    // Do not call "stopAllPipesWithCriticalException" because the sinks are 
not reused in
+    // ConfigNodeSubtask
   }
 
   /////////////////////////// Periodical Job Executor 
///////////////////////////
 
-  public void registerPeriodicalJob(String id, Runnable periodicalJob, long 
intervalInSeconds) {
-    pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
-  }
-
   public void registerPhantomReferenceCleanJob(
       String id, Runnable periodicalJob, long intervalInSeconds) {
     pipePeriodicalPhantomReferenceCleaner.register(id, periodicalJob, 
intervalInSeconds);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 0d6d3fea9d6..2403e353b56 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -26,9 +26,9 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -56,7 +56,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
   private final PipeTaskMeta pipeTaskMeta;
 
   // Pipe plugins for this subtask
-  private PipeExtractor extractor;
+  private PipeExtractor source;
 
   // TODO: currently unused
   @SuppressWarnings("unused")
@@ -65,9 +65,9 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
   public PipeConfigNodeSubtask(
       final String pipeName,
       final long creationTime,
-      final Map<String, String> extractorAttributes,
+      final Map<String, String> sourceAttributes,
       final Map<String, String> processorAttributes,
-      final Map<String, String> connectorAttributes,
+      final Map<String, String> sinkAttributes,
       final PipeTaskMeta pipeTaskMeta)
       throws Exception {
     // We initialize outputPipeConnector by initConnector()
@@ -75,34 +75,34 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
     this.pipeName = pipeName;
     this.pipeTaskMeta = pipeTaskMeta;
 
-    initExtractor(extractorAttributes);
+    initSource(sourceAttributes);
     initProcessor(processorAttributes);
-    initConnector(connectorAttributes);
+    initSink(sinkAttributes);
 
     PipeConfigRegionSinkMetrics.getInstance().register(this);
     PipeEventCommitManager.getInstance()
         .register(pipeName, creationTime, CONFIG_REGION_ID.getId(), pipeName + 
"_" + creationTime);
   }
 
-  private void initExtractor(final Map<String, String> extractorAttributes) 
throws Exception {
-    final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
+  private void initSource(final Map<String, String> sourceAttributes) throws 
Exception {
+    final PipeParameters sourceParameters = new 
PipeParameters(sourceAttributes);
 
     // 1. Construct extractor
-    extractor = 
PipeConfigNodeAgent.plugin().reflectExtractor(extractorParameters);
+    source = PipeConfigNodeAgent.plugin().reflectSource(sourceParameters);
 
     try {
       // 2. Validate extractor parameters
-      extractor.validate(new PipeParameterValidator(extractorParameters));
+      source.validate(new PipeParameterValidator(sourceParameters));
 
       // 3. Customize extractor
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
+              new PipeTaskSourceRuntimeEnvironment(
                   pipeName, creationTime, CONFIG_REGION_ID.getId(), 
pipeTaskMeta));
-      extractor.customize(extractorParameters, runtimeConfiguration);
+      source.customize(sourceParameters, runtimeConfiguration);
     } catch (final Exception e) {
       try {
-        extractor.close();
+        source.close();
       } catch (Exception closeException) {
         LOGGER.warn(
             "Failed to close extractor after failed to initialize extractor. "
@@ -131,31 +131,30 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
                 runtimeConfiguration);
   }
 
-  private void initConnector(final Map<String, String> connectorAttributes) 
throws Exception {
-    final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
+  private void initSink(final Map<String, String> sinkAttributes) throws 
Exception {
+    final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
 
     // 1. Construct connector
-    outputPipeConnector = 
PipeConfigNodeAgent.plugin().reflectConnector(connectorParameters);
+    outputPipeSink = PipeConfigNodeAgent.plugin().reflectSink(sinkParameters);
 
     try {
       // 2. Validate connector parameters
-      outputPipeConnector.validate(new 
PipeParameterValidator(connectorParameters));
+      outputPipeSink.validate(new PipeParameterValidator(sinkParameters));
 
       // 3. Customize connector
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
               new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
CONFIG_REGION_ID.getId()));
-      outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
+      outputPipeSink.customize(sinkParameters, runtimeConfiguration);
 
       // 4. Handshake
-      outputPipeConnector.handshake();
+      outputPipeSink.handshake();
     } catch (final Exception e) {
       try {
-        outputPipeConnector.close();
+        outputPipeSink.close();
       } catch (final Exception closeException) {
         LOGGER.warn(
-            "Failed to close connector after failed to initialize connector. "
-                + "Ignore this exception.",
+            "Failed to close sink after failed to initialize it. Ignore this 
exception.",
             closeException);
       }
       throw e;
@@ -175,7 +174,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       return false;
     }
 
-    final Event event = lastEvent != null ? lastEvent : extractor.supply();
+    final Event event = lastEvent != null ? lastEvent : source.supply();
     // Record the last event for retry when exception occurs
     setLastEvent(event);
 
@@ -185,7 +184,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       if (!(event instanceof ProgressReportEvent)) {
-        outputPipeConnector.transfer(event);
+        outputPipeSink.transfer(event);
         PipeConfigRegionSinkMetrics.getInstance().markConfigEvent(taskID);
       }
       decreaseReferenceCountAndReleaseLastEvent(event, true);
@@ -226,7 +225,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
     PipeConfigRegionSinkMetrics.getInstance().deregister(taskID);
 
     try {
-      extractor.close();
+      source.close();
     } catch (final Exception e) {
       LOGGER.info("Error occurred during closing PipeExtractor.", e);
     }
@@ -238,7 +237,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
     }
 
     try {
-      outputPipeConnector.close();
+      outputPipeSink.close();
     } catch (final Exception e) {
       LOGGER.info("Error occurred during closing PipeConnector.", e);
     } finally {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 3f66d8aa7c0..4f90d147e26 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -91,16 +91,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
         && pipeTaskMeta.getLeaderNodeId()
             == ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()
         && !ConfigRegionListeningFilter.parseListeningPlanTypeSet(
-                pipeStaticMeta.getExtractorParameters())
+                pipeStaticMeta.getSourceParameters())
             .isEmpty()) {
       final PipeConfigNodeTask pipeTask =
           new PipeConfigNodeTask(
               new PipeConfigNodeTaskStage(
                   pipeStaticMeta.getPipeName(),
                   pipeStaticMeta.getCreationTime(),
-                  pipeStaticMeta.getExtractorParameters().getAttribute(),
+                  pipeStaticMeta.getSourceParameters().getAttribute(),
                   pipeStaticMeta.getProcessorParameters().getAttribute(),
-                  pipeStaticMeta.getConnectorParameters().getAttribute(),
+                  pipeStaticMeta.getSinkParameters().getAttribute(),
                   pipeTaskMeta));
       pipeTask.create();
       pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
@@ -113,11 +113,6 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent 
{
         .put(consensusGroupId, pipeTaskMeta);
   }
 
-  public void stopAllPipesWithCriticalException() {
-    super.stopAllPipesWithCriticalException(
-        ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId());
-  }
-
   @Override
   protected TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChangesInternal(
       final PipeMeta pipeMetaFromCoordinator) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
index bab8badcb59..bdae3f0f07a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskBuilder.java
@@ -52,7 +52,7 @@ public class PipeConfigNodeTaskBuilder {
           && consensusGroupIdToPipeTaskMeta.getValue().getLeaderNodeId()
               == ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()
           && !ConfigRegionListeningFilter.parseListeningPlanTypeSet(
-                  pipeStaticMeta.getExtractorParameters())
+                  pipeStaticMeta.getSourceParameters())
               .isEmpty()) {
         consensusGroupIdToPipeTaskMap.put(
             consensusGroupId,
@@ -60,9 +60,9 @@ public class PipeConfigNodeTaskBuilder {
                 new PipeConfigNodeTaskStage(
                     pipeStaticMeta.getPipeName(),
                     pipeStaticMeta.getCreationTime(),
-                    pipeStaticMeta.getExtractorParameters().getAttribute(),
+                    pipeStaticMeta.getSourceParameters().getAttribute(),
                     pipeStaticMeta.getProcessorParameters().getAttribute(),
-                    pipeStaticMeta.getConnectorParameters().getAttribute(),
+                    pipeStaticMeta.getSinkParameters().getAttribute(),
                     consensusGroupIdToPipeTaskMeta.getValue())));
       }
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 3f92e05f33b..e1b6ce7c707 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -566,6 +566,7 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
 
   private TSStatus visitCommonTablePlan(final ConfigPhysicalPlan plan, final 
TSStatus context) {
     if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()
+        || context.getCode() == 
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()
         || context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()
         || context.getCode() == 
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
         || context.getCode() == 
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 685e7b08451..ef1e08f8536 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -91,7 +91,7 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
         .getConfigNodeConsensusProtocolClass()
         .equals(ConsensusFactory.SIMPLE_CONSENSUS)) {
       throw new PipeException(
-          "IoTDBConfigRegionExtractor does not transferring events under 
simple consensus");
+          "IoTDBConfigRegionSource does not transferring events under simple 
consensus");
     }
 
     super.customize(parameters, configuration);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index 409f436bdf4..0861bed200b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -88,7 +88,7 @@ public class PipeInfo implements SnapshotProcessor {
             () -> {
               try {
                 PipeConfigNodeAgent.runtime()
-                    
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
+                    
.increaseListenerReference(plan.getPipeStaticMeta().getSourceParameters());
                 return null;
               } catch (final Exception e) {
                 throw new PipeException("Failed to increase listener 
reference", e);
@@ -138,7 +138,7 @@ public class PipeInfo implements SnapshotProcessor {
             meta -> {
               try {
                 PipeConfigNodeAgent.runtime()
-                    
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
+                    
.decreaseListenerReference(meta.getStaticMeta().getSourceParameters());
               } catch (final Exception e) {
                 throw new PipeException("Failed to decrease listener 
reference", e);
               }
@@ -171,12 +171,12 @@ public class PipeInfo implements SnapshotProcessor {
                   
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
       if (message == null) {
         PipeConfigNodeAgent.runtime()
-            
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
+            
.increaseListenerReference(plan.getPipeStaticMeta().getSourceParameters());
         pipeMetaBeforeAlter.ifPresent(
             meta -> {
               try {
                 PipeConfigNodeAgent.runtime()
-                    
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
+                    
.decreaseListenerReference(meta.getStaticMeta().getSourceParameters());
               } catch (final Exception e) {
                 throw new PipeException("Failed to decrease listener 
reference", e);
               }
@@ -273,7 +273,7 @@ public class PipeInfo implements SnapshotProcessor {
 
       for (final PipeMeta pipeMeta : pipeTaskInfo.getPipeMetaList()) {
         PipeConfigNodeAgent.runtime()
-            
.increaseListenerReference(pipeMeta.getStaticMeta().getExtractorParameters());
+            
.increaseListenerReference(pipeMeta.getStaticMeta().getSourceParameters());
       }
     } catch (final Exception ex) {
       LOGGER.error("Failed to load pipe task info from snapshot", ex);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 93cf882b5b1..6aadee63719 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -214,9 +214,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
         new PipeStaticMeta(
             pipeStaticMetaFromCoordinator.getPipeName(),
             pipeStaticMetaFromCoordinator.getCreationTime(),
-            new 
HashMap<>(pipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute()),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getSourceParameters().getAttribute()),
             new 
HashMap<>(pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute()),
-            new 
HashMap<>(pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute()));
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getSinkParameters().getAttribute()));
 
     // 1. In modify mode, based on the passed attributes:
     //   1.1. if they are empty, the original attributes are filled directly.
@@ -225,11 +225,11 @@ public class PipeTaskInfo implements SnapshotProcessor {
     if (!alterPipeRequest.isReplaceAllExtractorAttributes) { // modify mode
       if (alterPipeRequest.getExtractorAttributes().isEmpty()) {
         alterPipeRequest.setExtractorAttributes(
-            
copiedPipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute());
+            
copiedPipeStaticMetaFromCoordinator.getSourceParameters().getAttribute());
       } else {
         alterPipeRequest.setExtractorAttributes(
             copiedPipeStaticMetaFromCoordinator
-                .getExtractorParameters()
+                .getSourceParameters()
                 .addOrReplaceEquivalentAttributes(
                     new 
PipeParameters(alterPipeRequest.getExtractorAttributes()))
                 .getAttribute());
@@ -253,11 +253,11 @@ public class PipeTaskInfo implements SnapshotProcessor {
     if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
       if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
         alterPipeRequest.setConnectorAttributes(
-            
copiedPipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
+            
copiedPipeStaticMetaFromCoordinator.getSinkParameters().getAttribute());
       } else {
         alterPipeRequest.setConnectorAttributes(
             copiedPipeStaticMetaFromCoordinator
-                .getConnectorParameters()
+                .getSinkParameters()
                 .addOrReplaceEquivalentAttributes(
                     new 
PipeParameters(alterPipeRequest.getConnectorAttributes()))
                 .getAttribute());
@@ -396,7 +396,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   private void validatePipePluginUsageByPipeInternal(String pluginName) {
     Iterable<PipeMeta> pipeMetas = getPipeMetaList();
     for (PipeMeta pipeMeta : pipeMetas) {
-      PipeParameters extractorParameters = 
pipeMeta.getStaticMeta().getExtractorParameters();
+      PipeParameters extractorParameters = 
pipeMeta.getStaticMeta().getSourceParameters();
       final String extractorPluginName =
           extractorParameters.getStringOrDefault(
               Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
@@ -420,7 +420,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
         throw new PipeException(exceptionMessage);
       }
 
-      PipeParameters connectorParameters = 
pipeMeta.getStaticMeta().getConnectorParameters();
+      PipeParameters connectorParameters = 
pipeMeta.getStaticMeta().getSinkParameters();
       final String connectorPluginName =
           connectorParameters.getStringOrDefault(
               Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index bd343412660..c48c450157c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -599,9 +599,7 @@ public abstract class AbstractOperatePipeProcedureV2
 
               try {
                 return !DataRegionListeningFilter.shouldDatabaseBeListened(
-                    copiedPipeMeta.getStaticMeta().getExtractorParameters(),
-                    isTableModel,
-                    database);
+                    copiedPipeMeta.getStaticMeta().getSourceParameters(), 
isTableModel, database);
               } catch (final Exception e) {
                 return false;
               }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index ab67185dacd..20e23a64701 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -127,7 +127,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
                   new PipeExternalSourceLoadBalancer(
                       pipeMeta
                           .getStaticMeta()
-                          .getExtractorParameters()
+                          .getSourceParameters()
                           .getStringOrDefault(
                               Arrays.asList(
                                   
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY,
@@ -136,7 +136,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
               final int parallelism =
                   pipeMeta
                       .getStaticMeta()
-                      .getExtractorParameters()
+                      .getSourceParameters()
                       .getIntOrDefault(
                           Arrays.asList(
                               EXTERNAL_EXTRACTOR_PARALLELISM_KEY, 
EXTERNAL_SOURCE_PARALLELISM_KEY),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 004a3ab47fb..2fd698112fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -276,7 +276,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       final PipeExternalSourceLoadBalancer loadBalancer =
           new PipeExternalSourceLoadBalancer(
               pipeStaticMeta
-                  .getExtractorParameters()
+                  .getSourceParameters()
                   .getStringOrDefault(
                       Arrays.asList(
                           
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY,
@@ -284,7 +284,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                       
PipeSourceConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY));
       final int parallelism =
           pipeStaticMeta
-              .getExtractorParameters()
+              .getSourceParameters()
               .getIntOrDefault(
                   Arrays.asList(
                       EXTERNAL_EXTRACTOR_PARALLELISM_KEY, 
EXTERNAL_SOURCE_PARALLELISM_KEY),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
index 06215ba46d1..76756a579f0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/PipeExternalSourceLoadBalancer.java
@@ -96,7 +96,7 @@ public class PipeExternalSourceLoadBalancer {
       // evenly distributed across running DataNodes.
       // 2. If no DataNodes are available, a PipeException is thrown.
       if (pipeStaticMeta
-          .getExtractorParameters()
+          .getSourceParameters()
           .getBooleanOrDefault(
               Arrays.asList(
                   
PipeSourceConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index d15391fc226..a2503fec8a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -171,7 +171,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final PipeTaskMeta pipeTaskMeta)
       throws IllegalPathException {
     if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
-      final PipeParameters sourceParameters = 
pipeStaticMeta.getExtractorParameters();
+      final PipeParameters sourceParameters = 
pipeStaticMeta.getSourceParameters();
       final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
       final boolean needConstructDataRegionTask =
           
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
@@ -240,7 +240,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     // Check each pipe
     for (final PipeMeta pipeMetaFromCoordinator : pipeMetaListFromCoordinator) 
{
       if (SchemaRegionListeningFilter.parseListeningPlanTypeSet(
-              pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters())
+              pipeMetaFromCoordinator.getStaticMeta().getSourceParameters())
           .isEmpty()) {
         continue;
       }
@@ -359,14 +359,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       // subscribed pipe, so the subscription needs to be manually marked as 
completed.
       if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
         final String topicName =
-            pipeMeta
-                .getStaticMeta()
-                .getConnectorParameters()
-                .getString(PipeSinkConstant.SINK_TOPIC_KEY);
+            
pipeMeta.getStaticMeta().getSinkParameters().getString(PipeSinkConstant.SINK_TOPIC_KEY);
         final String consumerGroupId =
             pipeMeta
                 .getStaticMeta()
-                .getConnectorParameters()
+                .getSinkParameters()
                 .getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
         SubscriptionAgent.broker().updateCompletedTopicNames(consumerGroupId, 
topicName);
       }
@@ -430,14 +427,14 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
         final String sourceModeValue =
             pipeMeta
                 .getStaticMeta()
-                .getExtractorParameters()
+                .getSourceParameters()
                 .getStringOrDefault(
                     Arrays.asList(
                         PipeSourceConstant.EXTRACTOR_MODE_KEY, 
PipeSourceConstant.SOURCE_MODE_KEY),
                     PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
-                        pipeMeta.getStaticMeta().getExtractorParameters())
+                        pipeMeta.getStaticMeta().getSourceParameters())
                     .getLeft()
                 && 
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
                     || sourceModeValue.equalsIgnoreCase(
@@ -512,9 +509,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
-                        pipeMeta.getStaticMeta().getExtractorParameters())
+                        pipeMeta.getStaticMeta().getSourceParameters())
                     .getLeft()
-                && 
isSnapshotMode(pipeMeta.getStaticMeta().getExtractorParameters());
+                && 
isSnapshotMode(pipeMeta.getStaticMeta().getSourceParameters());
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
index e39779769d5..b55704a3da4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
@@ -63,7 +63,7 @@ public class PipeDataNodeBuilder {
       final PipeTaskMeta pipeTaskMeta = 
consensusGroupIdToPipeTaskMeta.getValue();
 
       if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
-        final PipeParameters extractorParameters = 
pipeStaticMeta.getExtractorParameters();
+        final PipeParameters extractorParameters = 
pipeStaticMeta.getSourceParameters();
         final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
         final boolean needConstructDataRegionTask =
             dataRegionIds.contains(dataRegionId)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 7f0b6b4ff18..bcaf58ed2b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -85,9 +85,9 @@ public class PipeDataNodeTaskBuilder {
 
     // Analyzes the PipeParameters to identify potential conflicts.
     final PipeParameters extractorParameters =
-        blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
+        blendUserAndSystemParameters(pipeStaticMeta.getSourceParameters());
     final PipeParameters connectorParameters =
-        blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
+        blendUserAndSystemParameters(pipeStaticMeta.getSinkParameters());
     checkConflict(extractorParameters, connectorParameters);
     injectParameters(extractorParameters, connectorParameters);
 
@@ -135,7 +135,7 @@ public class PipeDataNodeTaskBuilder {
             PROCESSOR_EXECUTOR,
             pipeTaskMeta,
             pipeStaticMeta
-                .getConnectorParameters()
+                .getSinkParameters()
                 .getStringOrDefault(
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
index fc3bd646549..57a804df0d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSourceStage.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeExtractor;
@@ -51,8 +51,8 @@ public class PipeTaskSourceStage extends PipeTaskStage {
     pipeExtractor =
         StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))
                 || PipeRuntimeMeta.isSourceExternal(regionId)
-            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectExtractor(extractorParameters)
-            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectExtractor(extractorParameters);
+            ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSource(extractorParameters)
+            : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSource(extractorParameters);
 
     // Validate and customize should be called before createSubtask. this 
allows extractor exposing
     // exceptions in advance.
@@ -63,8 +63,7 @@ public class PipeTaskSourceStage extends PipeTaskStage {
       // 2. Customize extractor
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
-                  pipeName, creationTime, regionId, pipeTaskMeta));
+              new PipeTaskSourceRuntimeEnvironment(pipeName, creationTime, 
regionId, pipeTaskMeta));
       pipeExtractor.customize(extractorParameters, runtimeConfiguration);
     } catch (Exception e) {
       try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index acfa13c68c5..d3c94173848 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -108,13 +108,13 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       }
 
       if (event instanceof TabletInsertionEvent) {
-        outputPipeConnector.transfer((TabletInsertionEvent) event);
+        outputPipeSink.transfer((TabletInsertionEvent) event);
         PipeDataRegionSinkMetrics.getInstance().markTabletEvent(taskID);
       } else if (event instanceof TsFileInsertionEvent) {
-        outputPipeConnector.transfer((TsFileInsertionEvent) event);
+        outputPipeSink.transfer((TsFileInsertionEvent) event);
         PipeDataRegionSinkMetrics.getInstance().markTsFileEvent(taskID);
       } else if (event instanceof PipeSchemaRegionWritePlanEvent) {
-        outputPipeConnector.transfer(event);
+        outputPipeSink.transfer(event);
         if (((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
             != PlanNodeType.DELETE_DATA) {
           // Only plan nodes in schema region will be marked, delete data node 
is currently not
@@ -124,7 +124,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       } else if (event instanceof PipeHeartbeatEvent) {
         transferHeartbeatEvent((PipeHeartbeatEvent) event);
       } else {
-        outputPipeConnector.transfer(
+        outputPipeSink.transfer(
             event instanceof UserDefinedEnrichedEvent
                 ? ((UserDefinedEnrichedEvent) event).getUserDefinedEvent()
                 : event);
@@ -171,12 +171,12 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     }
 
     try {
-      outputPipeConnector.heartbeat();
-      outputPipeConnector.transfer(event);
+      outputPipeSink.heartbeat();
+      outputPipeSink.transfer(event);
     } catch (final Exception e) {
       throw new PipeConnectionException(
           "PipeConnector: "
-              + outputPipeConnector.getClass().getName()
+              + outputPipeSink.getClass().getName()
               + "(id: "
               + taskID
               + ")"
@@ -200,11 +200,11 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     isClosed.set(true);
     try {
       final long startTime = System.currentTimeMillis();
-      outputPipeConnector.close();
+      outputPipeSink.close();
       LOGGER.info(
           "Pipe: connector subtask {} ({}) was closed within {} ms",
           taskID,
-          outputPipeConnector,
+          outputPipeSink,
           System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
       LOGGER.info(
@@ -271,8 +271,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       decreaseHighPriorityTaskCount();
     }
 
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, 
regionId);
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, 
regionId);
     }
   }
 
@@ -302,68 +302,68 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
   }
 
   public int getAsyncConnectorRetryEventQueueSize() {
-    return outputPipeConnector instanceof IoTDBDataRegionAsyncSink
-        ? ((IoTDBDataRegionAsyncSink) 
outputPipeConnector).getRetryEventQueueSize()
+    return outputPipeSink instanceof IoTDBDataRegionAsyncSink
+        ? ((IoTDBDataRegionAsyncSink) outputPipeSink).getRetryEventQueueSize()
         : 0;
   }
 
   public int getPendingHandlersSize() {
-    return outputPipeConnector instanceof IoTDBDataRegionAsyncSink
-        ? ((IoTDBDataRegionAsyncSink) 
outputPipeConnector).getPendingHandlersSize()
+    return outputPipeSink instanceof IoTDBDataRegionAsyncSink
+        ? ((IoTDBDataRegionAsyncSink) outputPipeSink).getPendingHandlersSize()
         : 0;
   }
 
   public int getBatchSize() {
-    if (outputPipeConnector instanceof IoTDBDataRegionAsyncSink) {
-      return ((IoTDBDataRegionAsyncSink) outputPipeConnector).getBatchSize();
+    if (outputPipeSink instanceof IoTDBDataRegionAsyncSink) {
+      return ((IoTDBDataRegionAsyncSink) outputPipeSink).getBatchSize();
     }
-    if (outputPipeConnector instanceof IoTDBDataRegionSyncSink) {
-      return ((IoTDBDataRegionSyncSink) outputPipeConnector).getBatchSize();
+    if (outputPipeSink instanceof IoTDBDataRegionSyncSink) {
+      return ((IoTDBDataRegionSyncSink) outputPipeSink).getBatchSize();
     }
     return 0;
   }
 
   public double getTotalUncompressedSize() {
-    return outputPipeConnector instanceof IoTDBSink
-        ? ((IoTDBSink) outputPipeConnector).getTotalUncompressedSize()
+    return outputPipeSink instanceof IoTDBSink
+        ? ((IoTDBSink) outputPipeSink).getTotalUncompressedSize()
         : 0;
   }
 
   public double getTotalCompressedSize() {
-    return outputPipeConnector instanceof IoTDBSink
-        ? ((IoTDBSink) outputPipeConnector).getTotalCompressedSize()
+    return outputPipeSink instanceof IoTDBSink
+        ? ((IoTDBSink) outputPipeSink).getTotalCompressedSize()
         : 0;
   }
 
   public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) 
outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeSink).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
     }
   }
 
   public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) 
outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeSink).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
     }
   }
 
   public void setTabletBatchTimeIntervalHistogram(Histogram 
tabletBatchTimeIntervalHistogram) {
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeConnector)
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) outputPipeSink)
           
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
     }
   }
 
   public void setTsFileBatchTimeIntervalHistogram(Histogram 
tsFileBatchTimeIntervalHistogram) {
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) outputPipeConnector)
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) outputPipeSink)
           
.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
     }
   }
 
   public void setEventSizeHistogram(Histogram eventSizeHistogram) {
-    if (outputPipeConnector instanceof IoTDBSink) {
-      ((IoTDBSink) 
outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
+    if (outputPipeSink instanceof IoTDBSink) {
+      ((IoTDBSink) 
outputPipeSink).setBatchEventSizeHistogram(eventSizeHistogram);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index e25cf6be67f..c2a5575e4ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -132,10 +132,8 @@ public class PipeSinkSubtaskManager {
       for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
         final PipeConnector pipeConnector =
             isDataRegionConnector
-                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters)
-                : PipeDataNodeAgent.plugin()
-                    .schemaRegion()
-                    .reflectConnector(pipeConnectorParameters);
+                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
+                : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
         // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
         // connection) with the target
         try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 1f5ded9fb5f..cedbea1a13c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
@@ -310,8 +310,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       return;
     }
 
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeTaskSourceRuntimeEnvironment environment =
+        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index ce99a182e0b..7c547e94407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,8 +201,8 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
   public void customize(
       final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeTaskSourceRuntimeEnvironment environment =
+        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
         
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
index 9e42ede3194..f1eb102b32e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTPublishHandler.java
@@ -23,7 +23,7 @@ import 
org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent;
@@ -77,7 +77,7 @@ public class MQTTPublishHandler extends 
AbstractInterceptHandler {
 
   public MQTTPublishHandler(
       final PayloadFormatter payloadFormat,
-      final PipeTaskExtractorRuntimeEnvironment environment,
+      final PipeTaskSourceRuntimeEnvironment environment,
       final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue) {
     this.payloadFormat = payloadFormat;
     useTableInsert = 
PayloadFormatter.TABLE_TYPE.equals(this.payloadFormat.getType());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
index a754fc536af..f4d289b0ef6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/mqtt/MQTTSource.java
@@ -23,7 +23,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPend
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
@@ -165,8 +165,8 @@ public class MQTTSource implements PipeExtractor {
   public void customize(
       final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+    final PipeTaskSourceRuntimeEnvironment environment =
+        (PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 9e57532ad26..4d3a607d61f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2104,7 +2104,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 .containsKey(PipeSourceConstant.SOURCE_KEY)
             || alterPipeStatement.isReplaceAllExtractorAttributes()) {
           checkIfSourcePluginChanged(
-              pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
+              pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
               new PipeParameters(alterPipeStatement.getExtractorAttributes()));
         }
         if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
@@ -2114,18 +2114,18 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               onlyContainsUser(alterPipeStatement.getExtractorAttributes());
           pipeMetaFromCoordinator
               .getStaticMeta()
-              .getExtractorParameters()
+              .getSourceParameters()
               .addOrReplaceEquivalentAttributes(
                   new 
PipeParameters(alterPipeStatement.getExtractorAttributes()));
           extractorAttributes =
-              
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+              
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
           if (onlyContainsUser) {
             checkSourceType(alterPipeStatement.getPipeName(), 
extractorAttributes);
           }
         }
       } else {
         extractorAttributes =
-            
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute();
+            
pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute();
       }
 
       if (!alterPipeStatement.getProcessorAttributes().isEmpty()) {
@@ -2153,18 +2153,18 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               onlyContainsUser(alterPipeStatement.getConnectorAttributes());
           pipeMetaFromCoordinator
               .getStaticMeta()
-              .getConnectorParameters()
+              .getSinkParameters()
               .addOrReplaceEquivalentAttributes(
                   new 
PipeParameters(alterPipeStatement.getConnectorAttributes()));
           connectorAttributes =
-              
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+              
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
           if (onlyContainsUser) {
             checkSinkType(alterPipeStatement.getPipeName(), 
connectorAttributes);
           }
         }
       } else {
         connectorAttributes =
-            
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute();
+            
pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute();
       }
 
       PipeDataNodeAgent.plugin()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 7e19a267dc5..6fb2809c053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -102,7 +102,7 @@ public class SubscriptionSinkSubtaskManager {
               : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
       final PipeConnector pipeConnector =
-          
PipeDataNodeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
+          
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters);
       // 1. Construct, validate and customize PipeConnector, and then 
handshake (create connection)
       // with the target
       try {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
index 84de82206fd..5d4d6a640b6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java
@@ -94,7 +94,7 @@ public class PipeDataNodePluginAgentTest {
         IoTDBDataRegionSource.class,
         agent
             .dataRegion()
-            .reflectExtractor(
+            .reflectSource(
                 new PipeParameters(
                     new HashMap<String, String>() {
                       {
@@ -122,7 +122,7 @@ public class PipeDataNodePluginAgentTest {
         IoTDBDataRegionAsyncSink.class,
         agent
             .dataRegion()
-            .reflectConnector(
+            .reflectSink(
                 new PipeParameters(
                     new HashMap<String, String>() {
                       {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 6064de61973..be9dfadc4f9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
@@ -265,7 +265,7 @@ public class DeletionResourceTest {
             });
     final PipeTaskRuntimeConfiguration configuration =
         new PipeTaskRuntimeConfiguration(
-            new PipeTaskExtractorRuntimeEnvironment(
+            new PipeTaskSourceRuntimeEnvironment(
                 "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null));
     extractor.customize(parameters, configuration);
     Assert.assertTrue(extractor.shouldExtractDeletion());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 32af67855be..61d5232a3d8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.pattern;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -103,7 +103,7 @@ public class CachedSchemaPatternMatcherTest {
                 put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, "root");
               }
             }),
-        new PipeTaskRuntimeConfiguration(new 
PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+        new PipeTaskRuntimeConfiguration(new 
PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
     extractors.add(dataRegionExtractor);
 
     final int deviceExtractorNum = 10;
@@ -118,8 +118,7 @@ public class CachedSchemaPatternMatcherTest {
                   put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, "root.db" + 
finalI1);
                 }
               }),
-          new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
       extractors.add(deviceExtractor);
       for (int j = 0; j < seriesExtractorNum; j++) {
         final PipeRealtimeDataRegionSource seriesExtractor = new 
PipeRealtimeDataRegionFakeSource();
@@ -135,7 +134,7 @@ public class CachedSchemaPatternMatcherTest {
                   }
                 }),
             new PipeTaskRuntimeConfiguration(
-                new PipeTaskExtractorRuntimeEnvironment("1", 1, 1, null)));
+                new PipeTaskSourceRuntimeEnvironment("1", 1, 1, null)));
         extractors.add(seriesExtractor);
       }
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
index 41e6f04fc08..59aec4b1f7b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
@@ -150,28 +150,28 @@ public class PipeRealtimeExtractTest {
 
       final PipeTaskRuntimeConfiguration configuration0 =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
+              new PipeTaskSourceRuntimeEnvironment(
                   "1",
                   1,
                   Integer.parseInt(dataRegion1),
                   new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration1 =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
+              new PipeTaskSourceRuntimeEnvironment(
                   "1",
                   1,
                   Integer.parseInt(dataRegion1),
                   new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration2 =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
+              new PipeTaskSourceRuntimeEnvironment(
                   "1",
                   1,
                   Integer.parseInt(dataRegion2),
                   new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));
       final PipeTaskRuntimeConfiguration configuration3 =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskExtractorRuntimeEnvironment(
+              new PipeTaskSourceRuntimeEnvironment(
                   "1",
                   1,
                   Integer.parseInt(dataRegion2),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
index ff61d5527d6..33acd4beb63 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/PipePluginAgent.java
@@ -69,7 +69,7 @@ public abstract class PipePluginAgent {
   protected abstract PipeSinkConstructor createPipeConnectorConstructor(
       PipePluginMetaKeeper pipePluginMetaKeeper);
 
-  public final PipeExtractor reflectExtractor(PipeParameters 
extractorParameters) {
+  public final PipeExtractor reflectSource(PipeParameters extractorParameters) 
{
     return pipeExtractorConstructor.reflectPlugin(extractorParameters);
   }
 
@@ -77,7 +77,7 @@ public abstract class PipePluginAgent {
     return pipeProcessorConstructor.reflectPlugin(processorParameters);
   }
 
-  public final PipeConnector reflectConnector(PipeParameters 
connectorParameters) {
+  public final PipeConnector reflectSink(PipeParameters connectorParameters) {
     return pipeSinkConstructor.reflectPlugin(connectorParameters);
   }
 
@@ -95,7 +95,7 @@ public abstract class PipePluginAgent {
   protected PipeExtractor validateExtractor(Map<String, String> 
extractorAttributes)
       throws Exception {
     final PipeParameters extractorParameters = new 
PipeParameters(extractorAttributes);
-    final PipeExtractor temporaryExtractor = 
reflectExtractor(extractorParameters);
+    final PipeExtractor temporaryExtractor = 
reflectSource(extractorParameters);
     try {
       temporaryExtractor.validate(new 
PipeParameterValidator(extractorParameters));
     } finally {
@@ -127,7 +127,7 @@ public abstract class PipePluginAgent {
   protected PipeConnector validateConnector(
       String pipeName, Map<String, String> connectorAttributes) throws 
Exception {
     final PipeParameters connectorParameters = new 
PipeParameters(connectorAttributes);
-    final PipeConnector temporaryConnector = 
reflectConnector(connectorParameters);
+    final PipeConnector temporaryConnector = reflectSink(connectorParameters);
     try {
       temporaryConnector.validate(new 
PipeParameterValidator(connectorParameters));
       temporaryConnector.customize(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 10c92eb2695..612704318aa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -480,9 +480,9 @@ public abstract class PipeTaskAgent {
 
     calculateMemoryUsage(
         pipeMetaFromCoordinator.getStaticMeta(),
-        pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
+        pipeMetaFromCoordinator.getStaticMeta().getSourceParameters(),
         pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
-        pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters());
+        pipeMetaFromCoordinator.getStaticMeta().getSinkParameters());
 
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
@@ -1012,7 +1012,7 @@ public abstract class PipeTaskAgent {
                         for (final PipeRuntimeException e : 
pipeTaskMeta.getExceptionMessages()) {
                           if (e instanceof PipeRuntimeSinkCriticalException) {
                             reusedConnectorParameters2ExceptionMap.putIfAbsent(
-                                staticMeta.getConnectorParameters(),
+                                staticMeta.getSinkParameters(),
                                 (PipeRuntimeSinkCriticalException) e);
                           }
                         }
@@ -1032,13 +1032,13 @@ public abstract class PipeTaskAgent {
                       pipeTaskMeta -> {
                         if (pipeTaskMeta.getLeaderNodeId() == currentNodeId
                             && 
reusedConnectorParameters2ExceptionMap.containsKey(
-                                staticMeta.getConnectorParameters())
+                                staticMeta.getSinkParameters())
                             && !pipeTaskMeta.containsExceptionMessage(
                                 reusedConnectorParameters2ExceptionMap.get(
-                                    staticMeta.getConnectorParameters()))) {
+                                    staticMeta.getSinkParameters()))) {
                           final PipeRuntimeSinkCriticalException exception =
                               reusedConnectorParameters2ExceptionMap.get(
-                                  staticMeta.getConnectorParameters());
+                                  staticMeta.getSinkParameters());
                           pipeTaskMeta.trackExceptionMessage(exception);
                           LOGGER.warn(
                               "Pipe {} (creation time = {}) will be stopped 
because of critical exception "
@@ -1046,7 +1046,7 @@ public abstract class PipeTaskAgent {
                               staticMeta.getPipeName(),
                               staticMeta.getCreationTime(),
                               exception.getTimeStamp(),
-                              staticMeta.getConnectorParameters());
+                              staticMeta.getSinkParameters());
                         }
                       });
             });
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
index 3fa94cf9870..4b57cd64e43 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java
@@ -43,9 +43,9 @@ public class PipeStaticMeta {
   private String pipeName;
   private long creationTime;
 
-  private PipeParameters extractorParameters;
+  private PipeParameters sourceParameters;
   private PipeParameters processorParameters;
-  private PipeParameters connectorParameters;
+  private PipeParameters sinkParameters;
 
   private PipeStaticMeta() {
     // Empty constructor
@@ -59,9 +59,9 @@ public class PipeStaticMeta {
       final Map<String, String> connectorAttributes) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
-    extractorParameters = new PipeParameters(extractorAttributes);
+    sourceParameters = new PipeParameters(extractorAttributes);
     processorParameters = new PipeParameters(processorAttributes);
-    connectorParameters = new PipeParameters(connectorAttributes);
+    sinkParameters = new PipeParameters(connectorAttributes);
   }
 
   public String getPipeName() {
@@ -72,16 +72,16 @@ public class PipeStaticMeta {
     return creationTime;
   }
 
-  public PipeParameters getExtractorParameters() {
-    return extractorParameters;
+  public PipeParameters getSourceParameters() {
+    return sourceParameters;
   }
 
   public PipeParameters getProcessorParameters() {
     return processorParameters;
   }
 
-  public PipeParameters getConnectorParameters() {
-    return connectorParameters;
+  public PipeParameters getSinkParameters() {
+    return sinkParameters;
   }
 
   public PipeType getPipeType() {
@@ -90,7 +90,7 @@ public class PipeStaticMeta {
 
   public boolean isSourceExternal() {
     return !BuiltinPipePlugin.BUILTIN_SOURCES.contains(
-        extractorParameters
+        sourceParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
                 BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
@@ -108,8 +108,8 @@ public class PipeStaticMeta {
     ReadWriteIOUtils.write(pipeName, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
-    ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), 
outputStream);
-    for (final Map.Entry<String, String> entry : 
extractorParameters.getAttribute().entrySet()) {
+    ReadWriteIOUtils.write(sourceParameters.getAttribute().size(), 
outputStream);
+    for (final Map.Entry<String, String> entry : 
sourceParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
@@ -118,8 +118,8 @@ public class PipeStaticMeta {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
-    ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), 
outputStream);
-    for (final Map.Entry<String, String> entry : 
connectorParameters.getAttribute().entrySet()) {
+    ReadWriteIOUtils.write(sinkParameters.getAttribute().size(), outputStream);
+    for (final Map.Entry<String, String> entry : 
sinkParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
@@ -131,15 +131,15 @@ public class PipeStaticMeta {
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
     pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
 
-    pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.sourceParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
-    pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.sinkParameters = new PipeParameters(new HashMap<>());
 
     int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(inputStream);
       final String value = ReadWriteIOUtils.readString(inputStream);
-      pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.sourceParameters.getAttribute().put(key, value);
     }
     size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
@@ -151,7 +151,7 @@ public class PipeStaticMeta {
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(inputStream);
       final String value = ReadWriteIOUtils.readString(inputStream);
-      pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.sinkParameters.getAttribute().put(key, value);
     }
 
     return pipeStaticMeta;
@@ -163,15 +163,15 @@ public class PipeStaticMeta {
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
     pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
 
-    pipeStaticMeta.extractorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.sourceParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
-    pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.sinkParameters = new PipeParameters(new HashMap<>());
 
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(byteBuffer);
       final String value = ReadWriteIOUtils.readString(byteBuffer);
-      pipeStaticMeta.extractorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.sourceParameters.getAttribute().put(key, value);
     }
     size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
@@ -183,7 +183,7 @@ public class PipeStaticMeta {
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(byteBuffer);
       final String value = ReadWriteIOUtils.readString(byteBuffer);
-      pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+      pipeStaticMeta.sinkParameters.getAttribute().put(key, value);
     }
 
     return pipeStaticMeta;
@@ -200,15 +200,15 @@ public class PipeStaticMeta {
     final PipeStaticMeta that = (PipeStaticMeta) obj;
     return pipeName.equals(that.pipeName)
         && creationTime == that.creationTime
-        && extractorParameters.equals(that.extractorParameters)
+        && sourceParameters.equals(that.sourceParameters)
         && processorParameters.equals(that.processorParameters)
-        && connectorParameters.equals(that.connectorParameters);
+        && sinkParameters.equals(that.sinkParameters);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        pipeName, creationTime, extractorParameters, processorParameters, 
connectorParameters);
+        pipeName, creationTime, sourceParameters, processorParameters, 
sinkParameters);
   }
 
   @Override
@@ -218,12 +218,12 @@ public class PipeStaticMeta {
         + pipeName
         + "', creationTime="
         + creationTime
-        + ", extractorParameters="
-        + extractorParameters
+        + ", sourceParameters="
+        + sourceParameters
         + ", processorParameters="
         + processorParameters
-        + ", connectorParameters="
-        + connectorParameters
+        + ", sinkParameters="
+        + sinkParameters
         + "}";
   }
 
@@ -246,7 +246,7 @@ public class PipeStaticMeta {
 
   public boolean visibleUnder(final boolean isTableModel) {
     final Visibility visibility =
-        VisibilityUtils.calculateFromExtractorParameters(extractorParameters);
+        VisibilityUtils.calculateFromExtractorParameters(sourceParameters);
     return VisibilityUtils.isCompatible(visibility, isTableModel);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 62cce7438ba..66d43e0743c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -40,7 +40,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeAbstractSinkSubtask.class);
 
   // For output (transfer events to the target system in connector)
-  protected PipeConnector outputPipeConnector;
+  protected PipeConnector outputPipeSink;
 
   // For thread pool to execute callbacks
   protected ExecutorService subtaskCallbackListeningExecutor;
@@ -54,9 +54,9 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   protected volatile Event lastExceptionEvent;
 
   protected PipeAbstractSinkSubtask(
-      final String taskID, final long creationTime, final PipeConnector 
outputPipeConnector) {
+      final String taskID, final long creationTime, final PipeConnector 
outputPipeSink) {
     super(taskID, creationTime);
-    this.outputPipeConnector = outputPipeConnector;
+    this.outputPipeSink = outputPipeSink;
   }
 
   @Override
@@ -149,23 +149,23 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   private boolean onPipeConnectionException(final Throwable throwable) {
     LOGGER.warn(
         "PipeConnectionException occurred, {} retries to handshake with the 
target system.",
-        outputPipeConnector.getClass().getName(),
+        outputPipeSink.getClass().getName(),
         throwable);
 
     int retry = 0;
     while (retry < MAX_RETRY_TIMES) {
       try {
-        outputPipeConnector.handshake();
+        outputPipeSink.handshake();
         LOGGER.info(
             "{} handshakes with the target system successfully.",
-            outputPipeConnector.getClass().getName());
+            outputPipeSink.getClass().getName());
         break;
       } catch (final Exception e) {
         retry++;
         LOGGER.warn(
             "{} failed to handshake with the target system for {} times, "
                 + "will retry at most {} times.",
-            outputPipeConnector.getClass().getName(),
+            outputPipeSink.getClass().getName(),
             retry,
             MAX_RETRY_TIMES,
             e);
@@ -193,7 +193,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
               + "stopping current subtask {} (creation time: {}, simple class: 
{}). "
               + "Status shown when query the pipe will be 'STOPPED'. "
               + "Please restart the task by executing 'START PIPE' manually if 
needed.",
-          outputPipeConnector.getClass().getName(),
+          outputPipeSink.getClass().getName(),
           MAX_RETRY_TIMES,
           taskID,
           creationTime,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
similarity index 81%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
index c546ab47278..cfecf9c231a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskExtractorRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSourceRuntimeEnvironment.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 
-public class PipeTaskExtractorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+public class PipeTaskSourceRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
 
   private final PipeTaskMeta pipeTaskMeta;
 
-  public PipeTaskExtractorRuntimeEnvironment(
-      String pipeName, long creationTime, int regionId, PipeTaskMeta 
pipeTaskMeta) {
+  public PipeTaskSourceRuntimeEnvironment(
+      final String pipeName,
+      final long creationTime,
+      final int regionId,
+      final PipeTaskMeta pipeTaskMeta) {
     super(pipeName, creationTime, regionId);
     this.pipeTaskMeta = pipeTaskMeta;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
index 65bb977a624..c16f2310629 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBSource.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.source;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.pipe.api.PipeExtractor;
@@ -149,8 +149,8 @@ public abstract class IoTDBSource implements PipeExtractor {
   public void customize(
       final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment());
+    final PipeTaskSourceRuntimeEnvironment environment =
+        ((PipeTaskSourceRuntimeEnvironment) 
configuration.getRuntimeEnvironment());
     regionId = environment.getRegionId();
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();


Reply via email to