This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f6b16ed0aec Pipe: Added permission check for config receiver (#14418)
f6b16ed0aec is described below
commit f6b16ed0aec7e90289b8412a2bc7857eef263ff2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 16 22:58:46 2024 +0800
Pipe: Added permission check for config receiver (#14418)
---
.../pipe/it/manual/IoTDBPipePermissionIT.java | 71 +++++++++++-
.../iotdb/confignode/manager/ConfigManager.java | 4 +-
.../iotdb/confignode/manager/ProcedureManager.java | 10 +-
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 119 +++++++++++++++++++++
.../impl/schema/AlterLogicalViewProcedure.java | 108 ++++++++++---------
.../protocol/thrift/IoTDBDataNodeReceiver.java | 26 +++--
.../metadata/write/view/AlterLogicalViewNode.java | 47 +++++---
.../db/queryengine/plan/statement/Statement.java | 8 +-
8 files changed, 309 insertions(+), 84 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
index c35ff5e9455..648899b13ea 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipePermissionIT.java
@@ -100,7 +100,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
"create user `thulab` 'passwd'",
"create role `admin`",
"grant role `admin` to `thulab`",
- "grant WRITE, READ, MANAGE_DATABASE on root.** to role `admin`")))
{
+ "grant WRITE, READ, MANAGE_DATABASE, MANAGE_USER on root.** to
role `admin`"))) {
return;
}
@@ -113,6 +113,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
+ "create user user 'passwd'",
"create timeseries root.ln.wf02.wt01.temperature with
datatype=INT64,encoding=PLAIN",
"create timeseries root.ln.wf02.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf02.wt01(time, temperature, status) values
(1800000000000, 23, true)"))) {
@@ -143,6 +144,11 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "list user",
+ "User,",
+ new HashSet<>(Arrays.asList("root,", "user,", "thulab,")));
final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add(
"root.ln.wf02.wt01.temperature,null,root.ln,INT64,PLAIN,LZ4,null,null,null,null,BASE,");
@@ -162,4 +168,67 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualManualIT {
Collections.singleton("1800000000000,23,true,"));
}
}
+
+ @Test
+ public void testNoPermission() throws Exception {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv,
+ Arrays.asList(
+ "create user `thulab` 'passwd'",
+ "create role `admin`",
+ "grant role `admin` to `thulab`",
+ "grant READ, MANAGE_DATABASE on root.ln.** to role `admin`"))) {
+ return;
+ }
+
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "create user someUser 'passwd'",
+ "create database root.noPermission",
+ "create timeseries root.ln.wf02.wt01.status with
datatype=BOOLEAN,encoding=PLAIN"))) {
+ fail();
+ return;
+ }
+ awaitUntilFlush(senderEnv);
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.inclusion", "all");
+
+ connectorAttributes.put("connector", "iotdb-thrift-async-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.username", "thulab");
+ connectorAttributes.put("connector.password", "passwd");
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count databases", "count,",
Collections.singleton("1,"));
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show timeseries",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.emptySet());
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv, "list user", "User,", Collections.singleton("root,"));
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 06d256f236e..35f5559231f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -2185,8 +2185,8 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
- TSStatus status = confirmLeader();
+ public TSStatus alterLogicalView(final TAlterLogicalViewReq req) {
+ final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return procedureManager.alterLogicalView(req);
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 29a890a90ad..e18e02e4ab8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -357,11 +357,11 @@ public class ProcedureManager {
return waitingProcedureFinished(procedure);
}
- public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
- String queryId = req.getQueryId();
- ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
- Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
- int size = byteBuffer.getInt();
+ public TSStatus alterLogicalView(final TAlterLogicalViewReq req) {
+ final String queryId = req.getQueryId();
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap = new
HashMap<>();
+ final int size = byteBuffer.getInt();
PartialPath path;
ViewExpression viewExpression;
for (int i = 0; i < size; i++) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index a5e8a7248f4..656161bdb9f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.confignode.manager.pipe.receiver.protocol;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
@@ -202,6 +205,15 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private TSStatus executePlanAndClassifyExceptions(final ConfigPhysicalPlan
plan) {
TSStatus result;
try {
+ result = checkPermission(plan);
+ if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Permission check failed while executing plan
{}: {}",
+ receiverId.get(),
+ plan,
+ result);
+ return result;
+ }
result = executePlan(plan);
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
@@ -222,6 +234,113 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
return result;
}
+ private TSStatus checkPermission(final ConfigPhysicalPlan plan) {
+ switch (plan.getType()) {
+ case CreateDatabase:
+ case AlterDatabase:
+ case DeleteDatabase:
+ return configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.MANAGE_DATABASE.ordinal())
+ .getStatus();
+ case ExtendSchemaTemplate:
+ return configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.EXTEND_TEMPLATE.ordinal())
+ .getStatus();
+ case CreateSchemaTemplate:
+ case CommitSetSchemaTemplate:
+ case PipeUnsetTemplate:
+ return
CommonDescriptor.getInstance().getConfig().getAdminName().equals(username)
+ ? StatusUtils.OK
+ : new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
+ .setMessage("Only the admin user can perform this operation");
+ case PipeDeleteTimeSeries:
+ return configManager
+ .checkUserPrivileges(
+ username,
+ new ArrayList<>(
+ PathPatternTree.deserialize(
+ ((PipeDeleteTimeSeriesPlan)
plan).getPatternTreeBytes())
+ .getAllPathPatterns()),
+ PrivilegeType.WRITE_SCHEMA.ordinal())
+ .getStatus();
+ case PipeDeleteLogicalView:
+ return configManager
+ .checkUserPrivileges(
+ username,
+ new ArrayList<>(
+ PathPatternTree.deserialize(
+ ((PipeDeleteLogicalViewPlan)
plan).getPatternTreeBytes())
+ .getAllPathPatterns()),
+ PrivilegeType.WRITE_SCHEMA.ordinal())
+ .getStatus();
+ case PipeDeactivateTemplate:
+ return configManager
+ .checkUserPrivileges(
+ username,
+ new ArrayList<>(((PipeDeactivateTemplatePlan)
plan).getTemplateSetInfo().keySet()),
+ PrivilegeType.WRITE_SCHEMA.ordinal())
+ .getStatus();
+ case SetTTL:
+ return configManager
+ .checkUserPrivileges(
+ username,
+ Collections.singletonList(new PartialPath(((SetTTLPlan)
plan).getPathPattern())),
+ PrivilegeType.WRITE_SCHEMA.ordinal())
+ .getStatus();
+ case UpdateTriggerStateInTable:
+ case DeleteTriggerInTable:
+ return configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.USE_TRIGGER.ordinal())
+ .getStatus();
+ case GrantRole:
+ case GrantUser:
+ case RevokeUser:
+ case RevokeRole:
+ for (final int permission : ((AuthorPlan) plan).getPermissions()) {
+ final TSStatus status =
+ configManager
+ .checkUserPrivilegeGrantOpt(
+ username,
+ PrivilegeType.isPathRelevant(permission)
+ ? ((AuthorPlan) plan).getNodeNameList()
+ : Collections.emptyList(),
+ permission)
+ .getStatus();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ return status;
+ }
+ }
+ return StatusUtils.OK;
+ case UpdateUser:
+ return ((AuthorPlan) plan).getUserName().equals(username)
+ ? StatusUtils.OK
+ : configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.MANAGE_USER.ordinal())
+ .getStatus();
+ case CreateUser:
+ case CreateUserWithRawPassword:
+ case DropUser:
+ return configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.MANAGE_USER.ordinal())
+ .getStatus();
+ case CreateRole:
+ case DropRole:
+ case GrantRoleToUser:
+ case RevokeRoleFromUser:
+ return configManager
+ .checkUserPrivileges(
+ username, Collections.emptyList(),
PrivilegeType.MANAGE_ROLE.ordinal())
+ .getStatus();
+ default:
+ return StatusUtils.OK;
+ }
+ }
+
private TSStatus executePlan(final ConfigPhysicalPlan plan) throws
ConsensusException {
switch (plan.getType()) {
case CreateDatabase:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
index ece7b0d8c87..7db6c583aae 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java
@@ -74,14 +74,14 @@ public class AlterLogicalViewProcedure
private transient PathPatternTree pathPatternTree;
private transient ByteBuffer patternTreeBytes;
- public AlterLogicalViewProcedure(boolean isGeneratedByPipe) {
+ public AlterLogicalViewProcedure(final boolean isGeneratedByPipe) {
super(isGeneratedByPipe);
}
public AlterLogicalViewProcedure(
- String queryId,
- Map<PartialPath, ViewExpression> viewPathToSourceMap,
- boolean isGeneratedByPipe) {
+ final String queryId,
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap,
+ final boolean isGeneratedByPipe) {
super(isGeneratedByPipe);
this.queryId = queryId;
this.viewPathToSourceMap = viewPathToSourceMap;
@@ -89,9 +89,10 @@ public class AlterLogicalViewProcedure
}
@Override
- protected Flow executeFromState(ConfigNodeProcedureEnv env,
AlterLogicalViewState state)
+ protected Flow executeFromState(
+ final ConfigNodeProcedureEnv env, final AlterLogicalViewState state)
throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
try {
switch (state) {
case CLEAN_DATANODE_SCHEMA_CACHE:
@@ -103,7 +104,7 @@ public class AlterLogicalViewProcedure
LOGGER.info("Alter view {}", viewPathToSourceMap.keySet());
try {
alterLogicalView(env);
- } catch (ProcedureException e) {
+ } catch (final ProcedureException e) {
setFailure(e);
}
return Flow.NO_MORE_STATE;
@@ -117,17 +118,17 @@ public class AlterLogicalViewProcedure
}
}
- private void invalidateCache(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ private void invalidateCache(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq, TSStatus>
clientHandler =
+ final DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq,
TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
new TInvalidateMatchedSchemaCacheReq(patternTreeBytes),
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
+ final Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (final TSStatus status : statusMap.values()) {
// all dataNodes must clear the related schemaengine cache
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
@@ -140,17 +141,17 @@ public class AlterLogicalViewProcedure
}
}
- private void alterLogicalView(ConfigNodeProcedureEnv env) throws
ProcedureException {
- Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup =
+ private void alterLogicalView(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ final Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup =
env.getConfigManager().getRelatedSchemaRegionGroup(pathPatternTree);
- Map<TConsensusGroupId, Map<PartialPath, ViewExpression>>
schemaRegionRequestMap =
+ final Map<TConsensusGroupId, Map<PartialPath, ViewExpression>>
schemaRegionRequestMap =
new HashMap<>();
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
schemaRegionRequestMap
.computeIfAbsent(getBelongedSchemaRegion(env, entry.getKey()), k ->
new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- AlterLogicalViewRegionTaskExecutor<TAlterViewReq> regionTaskExecutor =
+ final AlterLogicalViewRegionTaskExecutor<TAlterViewReq> regionTaskExecutor
=
new AlterLogicalViewRegionTaskExecutor<>(
"Alter view",
env,
@@ -187,16 +188,16 @@ public class AlterLogicalViewProcedure
}
private TConsensusGroupId getBelongedSchemaRegion(
- ConfigNodeProcedureEnv env, PartialPath viewPath) throws
ProcedureException {
- PathPatternTree patternTree = new PathPatternTree();
+ final ConfigNodeProcedureEnv env, final PartialPath viewPath) throws
ProcedureException {
+ final PathPatternTree patternTree = new PathPatternTree();
patternTree.appendFullPath(viewPath);
patternTree.constructTree();
- Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionTable =
+ final Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
schemaPartitionTable =
env.getConfigManager().getSchemaPartition(patternTree,
false).schemaPartitionTable;
if (schemaPartitionTable.isEmpty()) {
throw new ProcedureException(new
ViewNotExistException(viewPath.getFullPath()));
} else {
- Map<TSeriesPartitionSlot, TConsensusGroupId> slotMap =
+ final Map<TSeriesPartitionSlot, TConsensusGroupId> slotMap =
schemaPartitionTable.values().iterator().next();
if (slotMap.isEmpty()) {
throw new ProcedureException(new
ViewNotExistException(viewPath.getFullPath()));
@@ -207,24 +208,24 @@ public class AlterLogicalViewProcedure
}
@Override
- protected boolean isRollbackSupported(AlterLogicalViewState
alterLogicalViewState) {
+ protected boolean isRollbackSupported(final AlterLogicalViewState
alterLogicalViewState) {
return true;
}
@Override
protected void rollbackState(
- ConfigNodeProcedureEnv env, AlterLogicalViewState alterLogicalViewState)
+ final ConfigNodeProcedureEnv env, final AlterLogicalViewState
alterLogicalViewState)
throws IOException, InterruptedException, ProcedureException {
invalidateCache(env);
}
@Override
- protected AlterLogicalViewState getState(int stateId) {
+ protected AlterLogicalViewState getState(final int stateId) {
return AlterLogicalViewState.values()[stateId];
}
@Override
- protected int getStateId(AlterLogicalViewState alterLogicalViewState) {
+ protected int getStateId(final AlterLogicalViewState alterLogicalViewState) {
return alterLogicalViewState.ordinal();
}
@@ -238,16 +239,16 @@ public class AlterLogicalViewProcedure
}
private void generatePathPatternTree() {
- PathPatternTree patternTree = new PathPatternTree();
- for (PartialPath path : viewPathToSourceMap.keySet()) {
+ final PathPatternTree patternTree = new PathPatternTree();
+ for (final PartialPath path : viewPathToSourceMap.keySet()) {
patternTree.appendFullPath(path);
}
patternTree.constructTree();
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ final DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
try {
patternTree.serialize(dataOutputStream);
- } catch (IOException ignored) {
+ } catch (final IOException ignored) {
// won't reach here
}
@@ -256,7 +257,7 @@ public class AlterLogicalViewProcedure
}
@Override
- public void serialize(DataOutputStream stream) throws IOException {
+ public void serialize(final DataOutputStream stream) throws IOException {
stream.writeShort(
isGeneratedByPipe
?
ProcedureType.PIPE_ENRICHED_ALTER_LOGICAL_VIEW_PROCEDURE.getTypeCode()
@@ -264,19 +265,19 @@ public class AlterLogicalViewProcedure
super.serialize(stream);
ReadWriteIOUtils.write(queryId, stream);
ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), stream);
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
entry.getKey().serialize(stream);
ViewExpression.serialize(entry.getValue(), stream);
}
}
@Override
- public void deserialize(ByteBuffer byteBuffer) {
+ public void deserialize(final ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
queryId = ReadWriteIOUtils.readString(byteBuffer);
- Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
- int size = byteBuffer.getInt();
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap = new
HashMap<>();
+ final int size = byteBuffer.getInt();
PartialPath path;
ViewExpression viewExpression;
for (int i = 0; i < size; i++) {
@@ -289,10 +290,14 @@ public class AlterLogicalViewProcedure
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof AlterLogicalViewProcedure)) return false;
- AlterLogicalViewProcedure that = (AlterLogicalViewProcedure) o;
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AlterLogicalViewProcedure)) {
+ return false;
+ }
+ final AlterLogicalViewProcedure that = (AlterLogicalViewProcedure) o;
return Objects.equals(getProcId(), that.getProcId())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& Objects.equals(getCycles(), that.getCycles())
@@ -320,27 +325,27 @@ public class AlterLogicalViewProcedure
private final List<TSStatus> failureStatusList = new ArrayList<>();
AlterLogicalViewRegionTaskExecutor(
- String taskName,
- ConfigNodeProcedureEnv env,
- Map<TConsensusGroupId, TRegionReplicaSet> targetSchemaRegionGroup,
- CnToDnAsyncRequestType dataNodeRequestType,
- BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
dataNodeRequestGenerator) {
+ final String taskName,
+ final ConfigNodeProcedureEnv env,
+ final Map<TConsensusGroupId, TRegionReplicaSet>
targetSchemaRegionGroup,
+ final CnToDnAsyncRequestType dataNodeRequestType,
+ final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
dataNodeRequestGenerator) {
super(env, targetSchemaRegionGroup, false, dataNodeRequestType,
dataNodeRequestGenerator);
this.taskName = taskName;
}
@Override
protected List<TConsensusGroupId> processResponseOfOneDataNode(
- TDataNodeLocation dataNodeLocation,
- List<TConsensusGroupId> consensusGroupIdList,
- TSStatus response) {
- List<TConsensusGroupId> failedRegionList = new ArrayList<>();
+ final TDataNodeLocation dataNodeLocation,
+ final List<TConsensusGroupId> consensusGroupIdList,
+ final TSStatus response) {
+ final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return failedRegionList;
}
if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- List<TSStatus> subStatusList = response.getSubStatus();
+ final List<TSStatus> subStatusList = response.getSubStatus();
TSStatus subStatus;
for (int i = 0; i < subStatusList.size(); i++) {
subStatus = subStatusList.get(i);
@@ -359,7 +364,7 @@ public class AlterLogicalViewProcedure
return failedRegionList;
}
- private void collectFailure(TSStatus failureStatus) {
+ private void collectFailure(final TSStatus failureStatus) {
if (failureStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
failureStatusList.addAll(failureStatus.getSubStatus());
} else {
@@ -379,7 +384,8 @@ public class AlterLogicalViewProcedure
@Override
protected void onAllReplicasetFailure(
- TConsensusGroupId consensusGroupId, Set<TDataNodeLocation>
dataNodeLocationSet) {
+ final TConsensusGroupId consensusGroupId,
+ final Set<TDataNodeLocation> dataNodeLocationSet) {
setFailure(
new ProcedureException(
new MetadataException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 47b8de606f4..cc46030e6c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -623,13 +623,25 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TPipeTransferResp handleTransferSchemaPlan(final
PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing
because
// the "AlterLogicalViewNode" is itself idempotent
- return req.getPlanNode() instanceof AlterLogicalViewNode
- ? new TPipeTransferResp(
- ClusterConfigTaskExecutor.getInstance()
- .alterLogicalViewByPipe((AlterLogicalViewNode)
req.getPlanNode()))
- : new TPipeTransferResp(
- executeStatementAndClassifyExceptions(
- PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
+ if (req.getPlanNode() instanceof AlterLogicalViewNode) {
+ final TSStatus status =
+ ((AlterLogicalViewNode)
req.getPlanNode()).checkPermissionBeforeProcess(username);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Receiver id = {}: Failed to check authority for statement {},
username = {}, response = {}.",
+ receiverId.get(),
+ StatementType.ALTER_LOGICAL_VIEW.name(),
+ username,
+ status);
+ return new TPipeTransferResp(status);
+ }
+ return new TPipeTransferResp(
+ ClusterConfigTaskExecutor.getInstance()
+ .alterLogicalViewByPipe((AlterLogicalViewNode)
req.getPlanNode()));
+ }
+ return new TPipeTransferResp(
+ executeStatementAndClassifyExceptions(
+ PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
}
private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq
req) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
index 963003c29d7..1c8f2645a01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/view/AlterLogicalViewNode.java
@@ -19,13 +19,17 @@
package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -47,7 +51,8 @@ public class AlterLogicalViewNode extends PlanNode {
*/
private final Map<PartialPath, ViewExpression> viewPathToSourceMap;
- public AlterLogicalViewNode(PlanNodeId id, Map<PartialPath, ViewExpression>
viewPathToSourceMap) {
+ public AlterLogicalViewNode(
+ final PlanNodeId id, final Map<PartialPath, ViewExpression>
viewPathToSourceMap) {
super(id);
this.viewPathToSourceMap = viewPathToSourceMap;
}
@@ -56,10 +61,24 @@ public class AlterLogicalViewNode extends PlanNode {
return viewPathToSourceMap;
}
- // region Interfaces in WritePlanNode or PlanNode
+ // For pipe
+ // TODO: Add auth check for source paths
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
+ if (AuthorityChecker.SUPER_USER.equals(userName)) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+ final List<PartialPath> targetPathList = new
ArrayList<>(viewPathToSourceMap.keySet());
+ return AuthorityChecker.getTSStatus(
+ AuthorityChecker.checkFullPathListPermission(
+ userName, targetPathList, PrivilegeType.WRITE_SCHEMA.ordinal()),
+ targetPathList,
+ PrivilegeType.WRITE_SCHEMA);
+ }
+
+ // region Interfaces in WritePlanNode or PlanNode
@Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ public <R, C> R accept(final PlanVisitor<R, C> visitor, final C context) {
return visitor.visitAlterLogicalView(this, context);
}
@@ -69,7 +88,7 @@ public class AlterLogicalViewNode extends PlanNode {
}
@Override
- public void addChild(PlanNode child) {
+ public void addChild(final PlanNode child) {
// do nothing. this node should never have any child
}
@@ -85,14 +104,14 @@ public class AlterLogicalViewNode extends PlanNode {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- AlterLogicalViewNode that = (AlterLogicalViewNode) obj;
+ final AlterLogicalViewNode that = (AlterLogicalViewNode) obj;
return (this.getPlanNodeId().equals(that.getPlanNodeId())
&& Objects.equals(this.viewPathToSourceMap, that.viewPathToSourceMap));
}
@@ -116,31 +135,31 @@ public class AlterLogicalViewNode extends PlanNode {
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
+ protected void serializeAttributes(final ByteBuffer byteBuffer) {
PlanNodeType.ALTER_LOGICAL_VIEW.serialize(byteBuffer);
// serialize other member variables for this node
ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), byteBuffer);
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
entry.getKey().serialize(byteBuffer);
ViewExpression.serialize(entry.getValue(), byteBuffer);
}
}
@Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ protected void serializeAttributes(final DataOutputStream stream) throws
IOException {
PlanNodeType.ALTER_LOGICAL_VIEW.serialize(stream);
// serialize other member variables for this node
ReadWriteIOUtils.write(this.viewPathToSourceMap.size(), stream);
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
entry.getKey().serialize(stream);
ViewExpression.serialize(entry.getValue(), stream);
}
}
- public static AlterLogicalViewNode deserialize(ByteBuffer byteBuffer) {
+ public static AlterLogicalViewNode deserialize(final ByteBuffer byteBuffer) {
// deserialize member variables
- Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
- int size = byteBuffer.getInt();
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap = new
HashMap<>();
+ final int size = byteBuffer.getInt();
PartialPath path;
ViewExpression viewExpression;
for (int i = 0; i < size; i++) {
@@ -149,7 +168,7 @@ public class AlterLogicalViewNode extends PlanNode {
viewPathToSourceMap.put(path, viewExpression);
}
// deserialize PlanNodeId next
- PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ final PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new AlterLogicalViewNode(planNodeId, viewPathToSourceMap);
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index f31cdb941b9..fc9aa1486a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -40,7 +40,7 @@ public abstract class Statement extends StatementNode {
protected Statement() {}
- public void setType(StatementType statementType) {
+ public void setType(final StatementType statementType) {
this.statementType = statementType;
}
@@ -52,7 +52,7 @@ public abstract class Statement extends StatementNode {
return isDebug;
}
- public void setDebug(boolean debug) {
+ public void setDebug(final boolean debug) {
isDebug = debug;
}
@@ -62,14 +62,14 @@ public abstract class Statement extends StatementNode {
public abstract List<? extends PartialPath> getPaths();
- public TSStatus checkPermissionBeforeProcess(String userName) {
+ public TSStatus checkPermissionBeforeProcess(final String userName) {
return AuthorityChecker.getTSStatus(
AuthorityChecker.SUPER_USER.equals(userName),
"Only the admin user can perform this operation");
}
public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
toRelationalStatement(
- MPPQueryContext context) {
+ final MPPQueryContext context) {
throw new UnsupportedOperationException("Method not implemented yet");
}
}