This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f6b16ed0aec Pipe: Added permission check for config receiver (#14418)
f6b16ed0aec is described below

commit f6b16ed0aec7e90289b8412a2bc7857eef263ff2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 16 22:58:46 2024 +0800

    Pipe: Added permission check for config receiver (#14418)
---
 .../pipe/it/manual/IoTDBPipePermissionIT.java      |  71 +++++++++++-
 .../iotdb/confignode/manager/ConfigManager.java    |   4 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  10 +-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 119 +++++++++++++++++++++
 .../impl/schema/AlterLogicalViewProcedure.java     | 108 ++++++++++---------
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  26 +++--
 .../metadata/write/view/AlterLogicalViewNode.java  |  47 +++++---
 .../db/queryengine/plan/statement/Statement.java   |   8 +-
 8 files changed, 309 insertions(+), 84 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index c35ff5e9455..648899b13ea 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -100,7 +100,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
             "create user `thulab` 'passwd'",
             "create role `admin`",
             "grant role `admin` to `thulab`",
-            "grant WRITE, READ, MANAGE_DATABASE on root.** to role `admin`"))) 
{
+            "grant WRITE, READ, MANAGE_DATABASE, MANAGE_USER on root.** to 
role `admin`"))) {
       return;
     }
 
@@ -113,6 +113,7 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
+              "create user user 'passwd'",
               "create timeseries root.ln.wf02.wt01.temperature with 
datatype=INT64,encoding=PLAIN",
               "create timeseries root.ln.wf02.wt01.status with 
datatype=BOOLEAN,encoding=PLAIN",
               "insert into root.ln.wf02.wt01(time, temperature, status) values 
(1800000000000, 23, true)"))) {
@@ -143,6 +144,11 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "list user",
+          "User,",
+          new HashSet<>(Arrays.asList("root,", "user,", "thulab,")));
       final Set<String> expectedResSet = new HashSet<>();
       expectedResSet.add(
           
"root.ln.wf02.wt01.temperature,null,root.ln,INT64,PLAIN,LZ4,null,null,null,null,BASE,");
@@ -162,4 +168,67 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualManualIT {
           Collections.singleton("1800000000000,23,true,"));
     }
   }
+
+  @Test
+  public void testNoPermission() throws Exception {
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        receiverEnv,
+        Arrays.asList(
+            "create user `thulab` 'passwd'",
+            "create role `admin`",
+            "grant role `admin` to `thulab`",
+            "grant READ, MANAGE_DATABASE on root.ln.** to role `admin`"))) {
+      return;
+    }
+
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "create user someUser 'passwd'",
+              "create database root.noPermission",
+              "create timeseries root.ln.wf02.wt01.status with 
datatype=BOOLEAN,encoding=PLAIN"))) {
+        fail();
+        return;
+      }
+      awaitUntilFlush(senderEnv);
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.inclusion", "all");
+
+      connectorAttributes.put("connector", "iotdb-thrift-async-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.username", "thulab");
+      connectorAttributes.put("connector.password", "passwd");
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "count databases", "count,", 
Collections.singleton("1,"));
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "show timeseries",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.emptySet());
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv, "list user", "User,", Collections.singleton("root,"));
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 06d256f236e..35f5559231f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -2185,8 +2185,8 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
-    TSStatus status = confirmLeader();
+  public TSStatus alterLogicalView(final TAlterLogicalViewReq req) {
+    final TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return procedureManager.alterLogicalView(req);
     } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 29a890a90ad..e18e02e4ab8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -357,11 +357,11 @@ public class ProcedureManager {
     return waitingProcedureFinished(procedure);
   }
 
-  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
-    String queryId = req.getQueryId();
-    ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
-    Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
-    int size = byteBuffer.getInt();
+  public TSStatus alterLogicalView(final TAlterLogicalViewReq req) {
+    final String queryId = req.getQueryId();
+    final ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
+    final Map<PartialPath, ViewExpression> viewPathToSourceMap = new 
HashMap<>();
+    final int size = byteBuffer.getInt();
     PartialPath path;
     ViewExpression viewExpression;
     for (int i = 0; i < size; i++) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index a5e8a7248f4..656161bdb9f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.confignode.manager.pipe.receiver.protocol;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
@@ -202,6 +205,15 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus executePlanAndClassifyExceptions(final ConfigPhysicalPlan 
plan) {
     TSStatus result;
     try {
+      result = checkPermission(plan);
+      if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Receiver id = {}: Permission check failed while executing plan 
{}: {}",
+            receiverId.get(),
+            plan,
+            result);
+        return result;
+      }
       result = executePlan(plan);
       if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.warn(
@@ -222,6 +234,113 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     return result;
   }
 
+  private TSStatus checkPermission(final ConfigPhysicalPlan plan) {
+    switch (plan.getType()) {
+      case CreateDatabase:
+      case AlterDatabase:
+      case DeleteDatabase:
+        return configManager
+            .checkUserPrivileges(
+                username, Collections.emptyList(), 
PrivilegeType.MANAGE_DATABASE.ordinal())
+            .getStatus();
+      case ExtendSchemaTemplate:
+        return configManager
+            .checkUserPrivileges(
+                username, Collections.emptyList(), 
PrivilegeType.EXTEND_TEMPLATE.ordinal())
+            .getStatus();
+      case CreateSchemaTemplate:
+      case CommitSetSchemaTemplate:
+      case PipeUnsetTemplate:
+        return 
CommonDescriptor.getInstance().getConfig().getAdminName().equals(username)
+            ? StatusUtils.OK
+            : new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
+                .setMessage("Only the admin user can perform this operation");
+      case PipeDeleteTimeSeries:
+        return configManager
+            .checkUserPrivileges(
+                username,
+                new ArrayList<>(
+                    PathPatternTree.deserialize(
+                            ((PipeDeleteTimeSeriesPlan) 
plan).getPatternTreeBytes())
+                        .getAllPathPatterns()),
+                PrivilegeType.WRITE_SCHEMA.ordinal())
+            .getStatus();
+      case PipeDeleteLogicalView:
+        return configManager
+            .checkUserPrivileges(
+                username,
+                new ArrayList<>(
+                    PathPatternTree.deserialize(
+                            ((PipeDeleteLogicalViewPlan) 
plan).getPatternTreeBytes())
+                        .getAllPathPatterns()),
+                PrivilegeType.WRITE_SCHEMA.ordinal())
+            .getStatus();
+      case PipeDeactivateTemplate:
+        return configManager
+            .checkUserPrivileges(
+                username,
+                new ArrayList<>(((PipeDeactivateTemplatePlan) 
plan).getTemplateSetInfo().keySet()),
+                PrivilegeType.WRITE_SCHEMA.ordinal())
+            .getStatus();
+      case SetTTL:
+        return configManager
+            .checkUserPrivileges(
+                username,
+                Collections.singletonList(new PartialPath(((SetTTLPlan) 
plan).getPathPattern())),
+                PrivilegeType.WRITE_SCHEMA.ordinal())
+            .getStatus();
+      case UpdateTriggerStateInTable:
+      case DeleteTriggerInTable:
+        return configManager
+            .checkUserPrivileges(
+                username, Collections.emptyList(), 
PrivilegeType.USE_TRIGGER.ordinal())
+            .getStatus();
+      case GrantRole:
+      case GrantUser:
+      case RevokeUser:
+      case RevokeRole:
+        for (final int permission : ((AuthorPlan) plan).getPermissions()) {
+          final TSStatus status =
+              configManager
+                  .checkUserPrivilegeGrantOpt(
+                      username,
+                      PrivilegeType.isPathRelevant(permission)
+                          ? ((AuthorPlan) plan).getNodeNameList()
+                          : Collections.emptyList(),
+                      permission)
+                  .getStatus();
+          if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
+            return status;
+          }
+        }
+        return StatusUtils.OK;
+      case UpdateUser:
+        return ((AuthorPlan) plan).getUserName().equals(username)
+            ? StatusUtils.OK
+            : configManager
+                .checkUserPrivileges(
+                    username, Collections.emptyList(), 
PrivilegeType.MANAGE_USER.ordinal())
+                .getStatus();
+      case CreateUser:
+      case CreateUserWithRawPassword:
+      case DropUser:
+        return configManager
+            .checkUserPrivileges(
+                username, Collections.emptyList(), 
PrivilegeType.MANAGE_USER.ordinal())
+            .getStatus();
+      case CreateRole:
+      case DropRole:
+      case GrantRoleToUser:
+      case RevokeRoleFromUser:
+        return configManager
+            .checkUserPrivileges(
+                username, Collections.emptyList(), 
PrivilegeType.MANAGE_ROLE.ordinal())
+            .getStatus();
+      default:
+        return StatusUtils.OK;
+    }
+  }
+
   private TSStatus executePlan(final ConfigPhysicalPlan plan) throws 
ConsensusException {
     switch (plan.getType()) {
       case CreateDatabase:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
index ece7b0d8c87..7db6c583aae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
@@ -74,14 +74,14 @@ public class AlterLogicalViewProcedure
   private transient PathPatternTree pathPatternTree;
   private transient ByteBuffer patternTreeBytes;
 
-  public AlterLogicalViewProcedure(boolean isGeneratedByPipe) {
+  public AlterLogicalViewProcedure(final boolean isGeneratedByPipe) {
     super(isGeneratedByPipe);
   }
 
   public AlterLogicalViewProcedure(
-      String queryId,
-      Map<PartialPath, ViewExpression> viewPathToSourceMap,
-      boolean isGeneratedByPipe) {
+      final String queryId,
+      final Map<PartialPath, ViewExpression> viewPathToSourceMap,
+      final boolean isGeneratedByPipe) {
     super(isGeneratedByPipe);
     this.queryId = queryId;
     this.viewPathToSourceMap = viewPathToSourceMap;
@@ -89,9 +89,10 @@ public class AlterLogicalViewProcedure
   }
 
   @Override
-  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
AlterLogicalViewState state)
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final AlterLogicalViewState state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
-    long startTime = System.currentTimeMillis();
+    final long startTime = System.currentTimeMillis();
     try {
       switch (state) {
         case CLEAN_DATANODE_SCHEMA_CACHE:
@@ -103,7 +104,7 @@ public class AlterLogicalViewProcedure
           LOGGER.info("Alter view {}", viewPathToSourceMap.keySet());
           try {
             alterLogicalView(env);
-          } catch (ProcedureException e) {
+          } catch (final ProcedureException e) {
             setFailure(e);
           }
           return Flow.NO_MORE_STATE;
@@ -117,17 +118,17 @@ public class AlterLogicalViewProcedure
     }
   }
 
-  private void invalidateCache(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  private void invalidateCache(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq, TSStatus> 
clientHandler =
+    final DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq, 
TSStatus> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
             new TInvalidateMatchedSchemaCacheReq(patternTreeBytes),
             dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
+    final Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+    for (final TSStatus status : statusMap.values()) {
       // all dataNodes must clear the related schemaengine cache
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.error(
@@ -140,17 +141,17 @@ public class AlterLogicalViewProcedure
     }
   }
 
-  private void alterLogicalView(ConfigNodeProcedureEnv env) throws 
ProcedureException {
-    Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup =
+  private void alterLogicalView(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    final Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup =
         env.getConfigManager().getRelatedSchemaRegionGroup(pathPatternTree);
-    Map<TConsensusGroupId, Map<PartialPath, ViewExpression>> 
schemaRegionRequestMap =
+    final Map<TConsensusGroupId, Map<PartialPath, ViewExpression>> 
schemaRegionRequestMap =
         new HashMap<>();
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       schemaRegionRequestMap
           .computeIfAbsent(getBelongedSchemaRegion(env, entry.getKey()), k -> 
new HashMap<>())
           .put(entry.getKey(), entry.getValue());
     }
-    AlterLogicalViewRegionTaskExecutor<TAlterViewReq> regionTaskExecutor =
+    final AlterLogicalViewRegionTaskExecutor<TAlterViewReq> regionTaskExecutor 
=
         new AlterLogicalViewRegionTaskExecutor<>(
             "Alter view",
             env,
@@ -187,16 +188,16 @@ public class AlterLogicalViewProcedure
   }
 
   private TConsensusGroupId getBelongedSchemaRegion(
-      ConfigNodeProcedureEnv env, PartialPath viewPath) throws 
ProcedureException {
-    PathPatternTree patternTree = new PathPatternTree();
+      final ConfigNodeProcedureEnv env, final PartialPath viewPath) throws 
ProcedureException {
+    final PathPatternTree patternTree = new PathPatternTree();
     patternTree.appendFullPath(viewPath);
     patternTree.constructTree();
-    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> 
schemaPartitionTable =
+    final Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> 
schemaPartitionTable =
         env.getConfigManager().getSchemaPartition(patternTree, 
false).schemaPartitionTable;
     if (schemaPartitionTable.isEmpty()) {
       throw new ProcedureException(new 
ViewNotExistException(viewPath.getFullPath()));
     } else {
-      Map<TSeriesPartitionSlot, TConsensusGroupId> slotMap =
+      final Map<TSeriesPartitionSlot, TConsensusGroupId> slotMap =
           schemaPartitionTable.values().iterator().next();
       if (slotMap.isEmpty()) {
         throw new ProcedureException(new 
ViewNotExistException(viewPath.getFullPath()));
@@ -207,24 +208,24 @@ public class AlterLogicalViewProcedure
   }
 
   @Override
-  protected boolean isRollbackSupported(AlterLogicalViewState 
alterLogicalViewState) {
+  protected boolean isRollbackSupported(final AlterLogicalViewState 
alterLogicalViewState) {
     return true;
   }
 
   @Override
   protected void rollbackState(
-      ConfigNodeProcedureEnv env, AlterLogicalViewState alterLogicalViewState)
+      final ConfigNodeProcedureEnv env, final AlterLogicalViewState 
alterLogicalViewState)
       throws IOException, InterruptedException, ProcedureException {
     invalidateCache(env);
   }
 
   @Override
-  protected AlterLogicalViewState getState(int stateId) {
+  protected AlterLogicalViewState getState(final int stateId) {
     return AlterLogicalViewState.values()[stateId];
   }
 
   @Override
-  protected int getStateId(AlterLogicalViewState alterLogicalViewState) {
+  protected int getStateId(final AlterLogicalViewState alterLogicalViewState) {
     return alterLogicalViewState.ordinal();
   }
 
@@ -238,16 +239,16 @@ public class AlterLogicalViewProcedure
   }
 
   private void generatePathPatternTree() {
-    PathPatternTree patternTree = new PathPatternTree();
-    for (PartialPath path : viewPathToSourceMap.keySet()) {
+    final PathPatternTree patternTree = new PathPatternTree();
+    for (final PartialPath path : viewPathToSourceMap.keySet()) {
       patternTree.appendFullPath(path);
     }
     patternTree.constructTree();
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+    final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+    final DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
     try {
       patternTree.serialize(dataOutputStream);
-    } catch (IOException ignored) {
+    } catch (final IOException ignored) {
       // won't reach here
     }
 
@@ -256,7 +257,7 @@ public class AlterLogicalViewProcedure
   }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(final DataOutputStream stream) throws IOException {
     stream.writeShort(
         isGeneratedByPipe
             ? 
ProcedureType.PIPE_ENRICHED_ALTER_LOGICAL_VIEW_PROCEDURE.getTypeCode()
@@ -264,19 +265,19 @@ public class AlterLogicalViewProcedure
     super.serialize(stream);
     ReadWriteIOUtils.write(queryId, stream);
     ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), stream);
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       entry.getKey().serialize(stream);
       ViewExpression.serialize(entry.getValue(), stream);
     }
   }
 
   @Override
-  public void deserialize(ByteBuffer byteBuffer) {
+  public void deserialize(final ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     queryId = ReadWriteIOUtils.readString(byteBuffer);
 
-    Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
-    int size = byteBuffer.getInt();
+    final Map<PartialPath, ViewExpression> viewPathToSourceMap = new 
HashMap<>();
+    final int size = byteBuffer.getInt();
     PartialPath path;
     ViewExpression viewExpression;
     for (int i = 0; i < size; i++) {
@@ -289,10 +290,14 @@ public class AlterLogicalViewProcedure
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof AlterLogicalViewProcedure)) return false;
-    AlterLogicalViewProcedure that = (AlterLogicalViewProcedure) o;
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof AlterLogicalViewProcedure)) {
+      return false;
+    }
+    final AlterLogicalViewProcedure that = (AlterLogicalViewProcedure) o;
     return Objects.equals(getProcId(), that.getProcId())
         && Objects.equals(getCurrentState(), that.getCurrentState())
         && Objects.equals(getCycles(), that.getCycles())
@@ -320,27 +325,27 @@ public class AlterLogicalViewProcedure
     private final List<TSStatus> failureStatusList = new ArrayList<>();
 
     AlterLogicalViewRegionTaskExecutor(
-        String taskName,
-        ConfigNodeProcedureEnv env,
-        Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
-        CnToDnAsyncRequestType dataNodeRequestType,
-        BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> 
dataNodeRequestGenerator) {
+        final String taskName,
+        final ConfigNodeProcedureEnv env,
+        final Map<TConsensusGroupId, TRegionReplicaSet> 
targetSchemaRegionGroup,
+        final CnToDnAsyncRequestType dataNodeRequestType,
+        final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> 
dataNodeRequestGenerator) {
       super(env, targetSchemaRegionGroup, false, dataNodeRequestType, 
dataNodeRequestGenerator);
       this.taskName = taskName;
     }
 
     @Override
     protected List<TConsensusGroupId> processResponseOfOneDataNode(
-        TDataNodeLocation dataNodeLocation,
-        List<TConsensusGroupId> consensusGroupIdList,
-        TSStatus response) {
-      List<TConsensusGroupId> failedRegionList = new ArrayList<>();
+        final TDataNodeLocation dataNodeLocation,
+        final List<TConsensusGroupId> consensusGroupIdList,
+        final TSStatus response) {
+      final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
       if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return failedRegionList;
       }
 
       if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-        List<TSStatus> subStatusList = response.getSubStatus();
+        final List<TSStatus> subStatusList = response.getSubStatus();
         TSStatus subStatus;
         for (int i = 0; i < subStatusList.size(); i++) {
           subStatus = subStatusList.get(i);
@@ -359,7 +364,7 @@ public class AlterLogicalViewProcedure
       return failedRegionList;
     }
 
-    private void collectFailure(TSStatus failureStatus) {
+    private void collectFailure(final TSStatus failureStatus) {
       if (failureStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
         failureStatusList.addAll(failureStatus.getSubStatus());
       } else {
@@ -379,7 +384,8 @@ public class AlterLogicalViewProcedure
 
     @Override
     protected void onAllReplicasetFailure(
-        TConsensusGroupId consensusGroupId, Set<TDataNodeLocation> 
dataNodeLocationSet) {
+        final TConsensusGroupId consensusGroupId,
+        final Set<TDataNodeLocation> dataNodeLocationSet) {
       setFailure(
           new ProcedureException(
               new MetadataException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 47b8de606f4..cc46030e6c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -623,13 +623,25 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TPipeTransferResp handleTransferSchemaPlan(final 
PipeTransferPlanNodeReq req) {
     // We may be able to skip the alter logical view's exception parsing 
because
     // the "AlterLogicalViewNode" is itself idempotent
-    return req.getPlanNode() instanceof AlterLogicalViewNode
-        ? new TPipeTransferResp(
-            ClusterConfigTaskExecutor.getInstance()
-                .alterLogicalViewByPipe((AlterLogicalViewNode) 
req.getPlanNode()))
-        : new TPipeTransferResp(
-            executeStatementAndClassifyExceptions(
-                PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
+    if (req.getPlanNode() instanceof AlterLogicalViewNode) {
+      final TSStatus status =
+          ((AlterLogicalViewNode) 
req.getPlanNode()).checkPermissionBeforeProcess(username);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.warn(
+            "Receiver id = {}: Failed to check authority for statement {}, 
username = {}, response = {}.",
+            receiverId.get(),
+            StatementType.ALTER_LOGICAL_VIEW.name(),
+            username,
+            status);
+        return new TPipeTransferResp(status);
+      }
+      return new TPipeTransferResp(
+          ClusterConfigTaskExecutor.getInstance()
+              .alterLogicalViewByPipe((AlterLogicalViewNode) 
req.getPlanNode()));
+    }
+    return new TPipeTransferResp(
+        executeStatementAndClassifyExceptions(
+            PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
   }
 
   private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq 
req) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
index 963003c29d7..1c8f2645a01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
@@ -19,13 +19,17 @@
 
 package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathDeserializeUtil;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -47,7 +51,8 @@ public class AlterLogicalViewNode extends PlanNode {
    */
   private final Map<PartialPath, ViewExpression> viewPathToSourceMap;
 
-  public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression> 
viewPathToSourceMap) {
+  public AlterLogicalViewNode(
+      final PlanNodeId id, final Map<PartialPath, ViewExpression> 
viewPathToSourceMap) {
     super(id);
     this.viewPathToSourceMap = viewPathToSourceMap;
   }
@@ -56,10 +61,24 @@ public class AlterLogicalViewNode extends PlanNode {
     return viewPathToSourceMap;
   }
 
-  // region Interfaces in WritePlanNode or PlanNode
+  // For pipe
+  // TODO: Add auth check for source paths
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
+    if (AuthorityChecker.SUPER_USER.equals(userName)) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
 
+    final List<PartialPath> targetPathList = new 
ArrayList<>(viewPathToSourceMap.keySet());
+    return AuthorityChecker.getTSStatus(
+        AuthorityChecker.checkFullPathListPermission(
+            userName, targetPathList, PrivilegeType.WRITE_SCHEMA.ordinal()),
+        targetPathList,
+        PrivilegeType.WRITE_SCHEMA);
+  }
+
+  // region Interfaces in WritePlanNode or PlanNode
   @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+  public <R, C> R accept(final PlanVisitor<R, C> visitor, final C context) {
     return visitor.visitAlterLogicalView(this, context);
   }
 
@@ -69,7 +88,7 @@ public class AlterLogicalViewNode extends PlanNode {
   }
 
   @Override
-  public void addChild(PlanNode child) {
+  public void addChild(final PlanNode child) {
     // do nothing. this node should never have any child
   }
 
@@ -85,14 +104,14 @@ public class AlterLogicalViewNode extends PlanNode {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    AlterLogicalViewNode that = (AlterLogicalViewNode) obj;
+    final AlterLogicalViewNode that = (AlterLogicalViewNode) obj;
     return (this.getPlanNodeId().equals(that.getPlanNodeId())
         && Objects.equals(this.viewPathToSourceMap, that.viewPathToSourceMap));
   }
@@ -116,31 +135,31 @@ public class AlterLogicalViewNode extends PlanNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
+  protected void serializeAttributes(final ByteBuffer byteBuffer) {
     PlanNodeType.ALTER_LOGICAL_VIEW.serialize(byteBuffer);
     // serialize other member variables for this node
     ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), byteBuffer);
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       entry.getKey().serialize(byteBuffer);
       ViewExpression.serialize(entry.getValue(), byteBuffer);
     }
   }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+  protected void serializeAttributes(final DataOutputStream stream) throws 
IOException {
     PlanNodeType.ALTER_LOGICAL_VIEW.serialize(stream);
     // serialize other member variables for this node
     ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), stream);
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       entry.getKey().serialize(stream);
       ViewExpression.serialize(entry.getValue(), stream);
     }
   }
 
-  public static AlterLogicalViewNode deserialize(ByteBuffer byteBuffer) {
+  public static AlterLogicalViewNode deserialize(final ByteBuffer byteBuffer) {
     // deserialize member variables
-    Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
-    int size = byteBuffer.getInt();
+    final Map<PartialPath, ViewExpression> viewPathToSourceMap = new 
HashMap<>();
+    final int size = byteBuffer.getInt();
     PartialPath path;
     ViewExpression viewExpression;
     for (int i = 0; i < size; i++) {
@@ -149,7 +168,7 @@ public class AlterLogicalViewNode extends PlanNode {
       viewPathToSourceMap.put(path, viewExpression);
     }
     // deserialize PlanNodeId next
-    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    final PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap);
   }
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index f31cdb941b9..fc9aa1486a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -40,7 +40,7 @@ public abstract class Statement extends StatementNode {
 
   protected Statement() {}
 
-  public void setType(StatementType statementType) {
+  public void setType(final StatementType statementType) {
     this.statementType = statementType;
   }
 
@@ -52,7 +52,7 @@ public abstract class Statement extends StatementNode {
     return isDebug;
   }
 
-  public void setDebug(boolean debug) {
+  public void setDebug(final boolean debug) {
     isDebug = debug;
   }
 
@@ -62,14 +62,14 @@ public abstract class Statement extends StatementNode {
 
   public abstract List<? extends PartialPath> getPaths();
 
-  public TSStatus checkPermissionBeforeProcess(String userName) {
+  public TSStatus checkPermissionBeforeProcess(final String userName) {
     return AuthorityChecker.getTSStatus(
         AuthorityChecker.SUPER_USER.equals(userName),
         "Only the admin user can perform this operation");
   }
 
   public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
toRelationalStatement(
-      MPPQueryContext context) {
+      final MPPQueryContext context) {
     throw new UnsupportedOperationException("Method not implemented yet");
   }
 }


Reply via email to