This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 502f54a5beb Pipe: Added the CreateMultiTimeSeries with merge logic and
disabled timeSeries existence check at SchemaExecutionVisitor for plans
transferred by pipe (#14317)
502f54a5beb is described below
commit 502f54a5beba7040809777940a4f92313cd25be6
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 5 12:08:11 2024 +0800
Pipe: Added the CreateMultiTimeSeries with merge logic and disabled
timeSeries existence check at SchemaExecutionVisitor for plans transferred by
pipe (#14317)
---
.../schemaregion/SchemaExecutionVisitor.java | 102 ++++----
.../execution/executor/RegionWriteExecutor.java | 268 ++++++++++-----------
2 files changed, 187 insertions(+), 183 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index c816f8760fa..cfcbfb07e1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -148,9 +148,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
- schemaRegion.createTimeSeries(
- transformToCreateTimeSeriesPlan(devicePath, measurementGroup,
i), -1);
- } catch (MetadataException e) {
+ final ICreateTimeSeriesPlan createTimeSeriesPlan =
+ transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i);
+ ((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).setWithMerge(node.isGeneratedByPipe());
+ schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+ } catch (final MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME,
e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
@@ -196,7 +198,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion,
alreadyExistingTimeSeries,
failingStatus,
- false);
+ node.isGeneratedByPipe());
} else {
executeInternalCreateTimeSeries(
devicePath,
@@ -204,7 +206,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
schemaRegion,
alreadyExistingTimeSeries,
failingStatus,
- false);
+ node.isGeneratedByPipe());
}
if (!failingStatus.isEmpty()) {
@@ -389,7 +391,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
@Override
- public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion
schemaRegion) {
+ public TSStatus visitAlterTimeSeries(
+ final AlterTimeSeriesNode node, final ISchemaRegion schemaRegion) {
try {
switch (node.getAlterType()) {
case RENAME:
@@ -425,12 +428,14 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
@Override
- public TSStatus visitActivateTemplate(ActivateTemplateNode node,
ISchemaRegion schemaRegion) {
+ public TSStatus visitActivateTemplate(
+ final ActivateTemplateNode node, final ISchemaRegion schemaRegion) {
try {
- Template template =
ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
+ final Template template =
+
ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
schemaRegion.activateSchemaTemplate(node, template);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -438,16 +443,17 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitBatchActivateTemplate(
- BatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
- for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+ final BatchActivateTemplateNode node, final ISchemaRegion schemaRegion) {
+ for (final Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
- Template template =
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+ final Template template =
+
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
try {
schemaRegion.activateSchemaTemplate(
SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
entry.getKey(), entry.getValue().right, entry.getValue().left),
template);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -457,21 +463,22 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitInternalBatchActivateTemplate(
- InternalBatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
- for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+ final InternalBatchActivateTemplateNode node, final ISchemaRegion
schemaRegion) {
+ for (final Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
- Template template =
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+ final Template template =
+
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
try {
schemaRegion.activateSchemaTemplate(
SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
entry.getKey(), entry.getValue().right, entry.getValue().left),
template);
- } catch (TemplateIsInUseException e) {
+ } catch (final TemplateIsInUseException e) {
logger.info(
String.format(
"Device Template has already been activated on path %s,
there's no need to activate again.",
entry.getKey()));
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -498,22 +505,23 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitRollbackSchemaBlackList(
- RollbackSchemaBlackListNode node, ISchemaRegion schemaRegion) {
+ final RollbackSchemaBlackListNode node, final ISchemaRegion
schemaRegion) {
try {
schemaRegion.rollbackSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
- public TSStatus visitDeleteTimeseries(DeleteTimeSeriesNode node,
ISchemaRegion schemaRegion) {
+ public TSStatus visitDeleteTimeseries(
+ final DeleteTimeSeriesNode node, final ISchemaRegion schemaRegion) {
try {
schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -521,11 +529,12 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitPreDeactivateTemplate(
- PreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
+ final PreDeactivateTemplateNode node, final ISchemaRegion schemaRegion) {
try {
- long preDeactivateNum =
schemaRegion.constructSchemaBlackListWithTemplate(node);
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS,
String.valueOf(preDeactivateNum));
- } catch (MetadataException e) {
+ return RpcUtils.getStatus(
+ TSStatusCode.SUCCESS_STATUS,
+
String.valueOf(schemaRegion.constructSchemaBlackListWithTemplate(node)));
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -533,18 +542,19 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitRollbackPreDeactivateTemplate(
- RollbackPreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
+ final RollbackPreDeactivateTemplateNode node, final ISchemaRegion
schemaRegion) {
try {
schemaRegion.rollbackSchemaBlackListWithTemplate(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
- public TSStatus visitDeactivateTemplate(DeactivateTemplateNode node,
ISchemaRegion schemaRegion) {
+ public TSStatus visitDeactivateTemplate(
+ final DeactivateTemplateNode node, final ISchemaRegion schemaRegion) {
try {
schemaRegion.deactivateTemplateInBlackList(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -555,15 +565,17 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
@Override
- public TSStatus visitCreateLogicalView(CreateLogicalViewNode node,
ISchemaRegion schemaRegion) {
- Map<PartialPath, ViewExpression> viewPathToSourceMap =
node.getViewPathToSourceExpressionMap();
- List<TSStatus> failingStatus = new ArrayList<>();
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ public TSStatus visitCreateLogicalView(
+ final CreateLogicalViewNode node, final ISchemaRegion schemaRegion) {
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap =
+ node.getViewPathToSourceExpressionMap();
+ final List<TSStatus> failingStatus = new ArrayList<>();
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
try {
schemaRegion.createLogicalView(
SchemaRegionWritePlanFactory.getCreateLogicalViewPlan(
entry.getKey(), entry.getValue()));
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
@@ -575,14 +587,15 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
@Override
- public TSStatus visitAlterLogicalView(AlterLogicalViewNode node,
ISchemaRegion schemaRegion) {
- Map<PartialPath, ViewExpression> viewPathToSourceMap =
node.getViewPathToSourceMap();
- List<TSStatus> failingStatus = new ArrayList<>();
- for (Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
+ public TSStatus visitAlterLogicalView(
+ final AlterLogicalViewNode node, final ISchemaRegion schemaRegion) {
+ final Map<PartialPath, ViewExpression> viewPathToSourceMap =
node.getViewPathToSourceMap();
+ final List<TSStatus> failingStatus = new ArrayList<>();
+ for (final Map.Entry<PartialPath, ViewExpression> entry :
viewPathToSourceMap.entrySet()) {
try {
schemaRegion.alterLogicalView(
SchemaRegionWritePlanFactory.getAlterLogicalViewPlan(entry.getKey(),
entry.getValue()));
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
}
@@ -595,10 +608,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitConstructLogicalViewBlackList(
- ConstructLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
+ final ConstructLogicalViewBlackListNode node, final ISchemaRegion
schemaRegion) {
try {
- long preDeletedNum =
schemaRegion.constructLogicalViewBlackList(node.getPatternTree());
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS,
String.valueOf(preDeletedNum));
+ return RpcUtils.getStatus(
+ TSStatusCode.SUCCESS_STATUS,
+
String.valueOf(schemaRegion.constructLogicalViewBlackList(node.getPatternTree())));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
@@ -607,11 +621,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitRollbackLogicalViewBlackList(
- RollbackLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
+ final RollbackLogicalViewBlackListNode node, final ISchemaRegion
schemaRegion) {
try {
schemaRegion.rollbackLogicalViewBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index cbfe1682e19..cef49d96c3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -162,7 +162,8 @@ public class RegionWriteExecutor {
extends PlanVisitor<RegionExecutionResult,
WritePlanNodeExecutionContext> {
@Override
- public RegionExecutionResult visitPlan(PlanNode node,
WritePlanNodeExecutionContext context) {
+ public RegionExecutionResult visitPlan(
+ final PlanNode node, final WritePlanNodeExecutionContext context) {
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
return RegionExecutionResult.create(
@@ -174,12 +175,12 @@ public class RegionWriteExecutor {
}
try {
- TSStatus status =
executePlanNodeInConsensusLayer(context.getRegionId(), node);
+ final TSStatus status =
executePlanNodeInConsensusLayer(context.getRegionId(), node);
return RegionExecutionResult.create(
TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
status.getMessage(),
status);
- } catch (ConsensusException e) {
+ } catch (final ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
TSStatus status =
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
if (e instanceof ConsensusGroupNotExistException) {
@@ -189,8 +190,8 @@ public class RegionWriteExecutor {
}
}
- private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId,
PlanNode planNode)
- throws ConsensusException {
+ private TSStatus executePlanNodeInConsensusLayer(
+ final ConsensusGroupId groupId, final PlanNode planNode) throws
ConsensusException {
if (groupId instanceof DataRegionId) {
return dataRegionConsensus.write(groupId, planNode);
} else {
@@ -200,64 +201,65 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitInsertRow(
- InsertRowNode node, WritePlanNodeExecutionContext context) {
+ final InsertRowNode node, final WritePlanNodeExecutionContext context)
{
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitInsertTablet(
- InsertTabletNode node, WritePlanNodeExecutionContext context) {
+ final InsertTabletNode node, final WritePlanNodeExecutionContext
context) {
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitRelationalInsertTablet(
- RelationalInsertTabletNode node, WritePlanNodeExecutionContext
context) {
+ final RelationalInsertTabletNode node, final
WritePlanNodeExecutionContext context) {
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitInsertRows(
- InsertRowsNode node, WritePlanNodeExecutionContext context) {
+ final InsertRowsNode node, final WritePlanNodeExecutionContext
context) {
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitInsertMultiTablets(
- InsertMultiTabletsNode node, WritePlanNodeExecutionContext context) {
+ final InsertMultiTabletsNode node, final WritePlanNodeExecutionContext
context) {
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitInsertRowsOfOneDevice(
- InsertRowsOfOneDeviceNode node, WritePlanNodeExecutionContext context)
{
+ final InsertRowsOfOneDeviceNode node, final
WritePlanNodeExecutionContext context) {
return executeDataInsert(node, context);
}
@Override
public RegionExecutionResult visitPipeEnrichedInsertNode(
- PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) {
+ final PipeEnrichedInsertNode node, final WritePlanNodeExecutionContext
context) {
return executeDataInsert(node, context);
}
private RegionExecutionResult executeDataInsert(
- InsertNode insertNode, WritePlanNodeExecutionContext context) {
+ final InsertNode insertNode, final WritePlanNodeExecutionContext
context) {
if (context.getRegionWriteValidationRWLock() == null) {
- String message = "Failed to get the lock of the region because the
region is not existed.";
+ final String message =
+ "Failed to get the lock of the region because the region is not
existed.";
return RegionExecutionResult.create(
false, message,
RpcUtils.getStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP, message));
}
context.getRegionWriteValidationRWLock().readLock().lock();
try {
- TSStatus status = fireTriggerAndInsert(context.getRegionId(),
insertNode);
+ final TSStatus status = fireTriggerAndInsert(context.getRegionId(),
insertNode);
return RegionExecutionResult.create(
TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
status.message,
status);
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
- TSStatus status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR,
e.toString());
+ final TSStatus status =
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, e.toString());
if (e instanceof ConsensusGroupNotExistException) {
status.setCode(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
}
@@ -267,8 +269,8 @@ public class RegionWriteExecutor {
}
}
- private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode
insertNode)
- throws ConsensusException {
+ private TSStatus fireTriggerAndInsert(
+ final ConsensusGroupId groupId, final InsertNode insertNode) throws
ConsensusException {
long triggerCostTime = 0;
TSStatus status;
long startTime = System.nanoTime();
@@ -281,7 +283,7 @@ public class RegionWriteExecutor {
TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
"Failed to complete the insertion because trigger error before
the insertion.");
} else {
- long startWriteTime = System.nanoTime();
+ final long startWriteTime = System.nanoTime();
status = dataRegionConsensus.write(groupId, insertNode);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() -
startWriteTime);
@@ -304,7 +306,7 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitPipeEnrichedDeleteDataNode(
- PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext
context) {
+ final PipeEnrichedDeleteDataNode node, final
WritePlanNodeExecutionContext context) {
// data deletion should block data insertion, especially when executed
for deleting timeseries
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
@@ -316,7 +318,7 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitDeleteData(
- DeleteDataNode node, WritePlanNodeExecutionContext context) {
+ final DeleteDataNode node, final WritePlanNodeExecutionContext
context) {
// data deletion don't need to block data insertion, but there are some
creation operation
// require write lock on data region.
context.getRegionWriteValidationRWLock().writeLock().lock();
@@ -329,7 +331,7 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitDeleteData(
- RelationalDeleteDataNode node, WritePlanNodeExecutionContext context) {
+ final RelationalDeleteDataNode node, final
WritePlanNodeExecutionContext context) {
// data deletion don't need to block data insertion, but there are some
creation operation
// require write lock on data region.
context.getRegionWriteValidationRWLock().writeLock().lock();
@@ -342,35 +344,34 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitCreateTimeSeries(
- CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+ final CreateTimeSeriesNode node, final WritePlanNodeExecutionContext
context) {
return executeCreateTimeSeries(node, context, false);
}
private RegionExecutionResult executeCreateTimeSeries(
- CreateTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
- ISchemaRegion schemaRegion =
+ final CreateTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
- RegionExecutionResult result =
+ final RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(schemaRegion,
node.getPath().getDevicePath(), 1);
if (result != null) {
return result;
}
- if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+ && !receivedFromPipe) {
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- Map<Integer, MetadataException> failingMeasurementMap =
+ final Map<Integer, MetadataException> failingMeasurementMap =
schemaRegion.checkMeasurementExistence(
node.getPath().getDevicePath(),
Collections.singletonList(node.getPath().getMeasurement()),
Collections.singletonList(node.getAlias()));
if (failingMeasurementMap.isEmpty()) {
- return receivedFromPipe
- ? super.visitPipeEnrichedWritePlanNode(new
PipeEnrichedWritePlanNode(node), context)
- : super.visitCreateTimeSeries(node, context);
+ return super.visitCreateTimeSeries(node, context);
} else {
- MetadataException metadataException = failingMeasurementMap.get(0);
+ final MetadataException metadataException =
failingMeasurementMap.get(0);
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
return RegionExecutionResult.create(
false,
@@ -390,34 +391,34 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitCreateAlignedTimeSeries(
- CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ final CreateAlignedTimeSeriesNode node, final
WritePlanNodeExecutionContext context) {
return executeCreateAlignedTimeSeries(node, context, false);
}
private RegionExecutionResult executeCreateAlignedTimeSeries(
- CreateAlignedTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
- ISchemaRegion schemaRegion =
+ final CreateAlignedTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
- RegionExecutionResult result =
+ final RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(
schemaRegion, node.getDevicePath(),
node.getMeasurements().size());
if (result != null) {
return result;
}
- if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+ && !receivedFromPipe) {
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- Map<Integer, MetadataException> failingMeasurementMap =
+ final Map<Integer, MetadataException> failingMeasurementMap =
schemaRegion.checkMeasurementExistence(
node.getDevicePath(), node.getMeasurements(),
node.getAliasList());
if (failingMeasurementMap.isEmpty()) {
- return receivedFromPipe
- ? super.visitPipeEnrichedWritePlanNode(new
PipeEnrichedWritePlanNode(node), context)
- : super.visitCreateAlignedTimeSeries(node, context);
+ return super.visitCreateAlignedTimeSeries(node, context);
} else {
- MetadataException metadataException =
failingMeasurementMap.values().iterator().next();
+ final MetadataException metadataException =
+ failingMeasurementMap.values().iterator().next();
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
return RegionExecutionResult.create(
false,
@@ -437,18 +438,18 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitCreateMultiTimeSeries(
- CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context)
{
+ final CreateMultiTimeSeriesNode node, final
WritePlanNodeExecutionContext context) {
return executeCreateMultiTimeSeries(node, context, false);
}
private RegionExecutionResult executeCreateMultiTimeSeries(
- CreateMultiTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
- ISchemaRegion schemaRegion =
+ final CreateMultiTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
RegionExecutionResult result;
- for (Map.Entry<PartialPath, MeasurementGroup> entry :
+ for (final Map.Entry<PartialPath, MeasurementGroup> entry :
node.getMeasurementGroupMap().entrySet()) {
result =
checkQuotaBeforeCreatingTimeSeries(
@@ -457,29 +458,30 @@ public class RegionWriteExecutor {
return result;
}
}
- if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+ && !receivedFromPipe) {
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- List<TSStatus> failingStatus = new ArrayList<>();
- Map<PartialPath, MeasurementGroup> measurementGroupMap =
node.getMeasurementGroupMap();
- List<PartialPath> emptyDeviceList = new ArrayList<>();
+ final List<TSStatus> failingStatus = new ArrayList<>();
+ final Map<PartialPath, MeasurementGroup> measurementGroupMap =
+ node.getMeasurementGroupMap();
+ final List<PartialPath> emptyDeviceList = new ArrayList<>();
checkMeasurementExistence(
measurementGroupMap, schemaRegion, failingStatus,
emptyDeviceList);
- for (PartialPath emptyDevice : emptyDeviceList) {
+ for (final PartialPath emptyDevice : emptyDeviceList) {
measurementGroupMap.remove(emptyDevice);
}
- RegionExecutionResult failingResult =
- registerTimeSeries(
- measurementGroupMap, node, context, failingStatus,
receivedFromPipe);
+ final RegionExecutionResult failingResult =
+ registerTimeSeries(measurementGroupMap, node, context,
failingStatus);
if (failingResult != null) {
return failingResult;
}
- TSStatus status = RpcUtils.getStatus(failingStatus);
+ final TSStatus status = RpcUtils.getStatus(failingStatus);
return RegionExecutionResult.create(false, status.getMessage(),
status);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -492,12 +494,12 @@ public class RegionWriteExecutor {
}
private void checkMeasurementExistence(
- Map<PartialPath, MeasurementGroup> measurementGroupMap,
- ISchemaRegion schemaRegion,
- List<TSStatus> failingStatus,
- List<PartialPath> emptyDeviceList) {
- for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
- Map<Integer, MetadataException> failingMeasurementMap =
+ final Map<PartialPath, MeasurementGroup> measurementGroupMap,
+ final ISchemaRegion schemaRegion,
+ final List<TSStatus> failingStatus,
+ final List<PartialPath> emptyDeviceList) {
+ for (final Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
+ final Map<Integer, MetadataException> failingMeasurementMap =
schemaRegion.checkMeasurementExistence(
entry.getKey(),
entry.getValue().getMeasurements(),
@@ -506,7 +508,7 @@ public class RegionWriteExecutor {
continue;
}
- for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ for (final Map.Entry<Integer, MetadataException> failingMeasurement :
failingMeasurementMap.entrySet()) {
LOGGER.warn(METADATA_ERROR_MSG, failingMeasurement.getValue());
failingStatus.add(
@@ -523,22 +525,19 @@ public class RegionWriteExecutor {
}
private RegionExecutionResult registerTimeSeries(
- Map<PartialPath, MeasurementGroup> measurementGroupMap,
- CreateMultiTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- List<TSStatus> failingStatus,
- boolean receivedFromPipe) {
+ final Map<PartialPath, MeasurementGroup> measurementGroupMap,
+ final CreateMultiTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final List<TSStatus> failingStatus) {
if (!measurementGroupMap.isEmpty()) {
// try registering the rest timeseries
- RegionExecutionResult executionResult =
- receivedFromPipe
- ? super.visitPipeEnrichedWritePlanNode(new
PipeEnrichedWritePlanNode(node), context)
- : super.visitCreateMultiTimeSeries(node, context);
+ final RegionExecutionResult executionResult =
+ super.visitCreateMultiTimeSeries(node, context);
if (failingStatus.isEmpty()) {
return executionResult;
}
- TSStatus executionStatus = executionResult.getStatus();
+ final TSStatus executionStatus = executionResult.getStatus();
if (executionStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
failingStatus.addAll(executionStatus.getSubStatus());
} else if (executionStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -550,36 +549,37 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitInternalCreateTimeSeries(
- InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ final InternalCreateTimeSeriesNode node, final
WritePlanNodeExecutionContext context) {
return executeInternalCreateTimeSeries(node, context, false);
}
private RegionExecutionResult executeInternalCreateTimeSeries(
- InternalCreateTimeSeriesNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
- ISchemaRegion schemaRegion =
+ final InternalCreateTimeSeriesNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
+ final ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
- RegionExecutionResult result =
+ final RegionExecutionResult result =
checkQuotaBeforeCreatingTimeSeries(
schemaRegion, node.getDevicePath(),
node.getMeasurementGroup().size());
if (result != null) {
return result;
}
- if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+ && !receivedFromPipe) {
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
- List<TSStatus> failingStatus = new ArrayList<>();
- List<TSStatus> alreadyExistingStatus = new ArrayList<>();
- MeasurementGroup measurementGroup = node.getMeasurementGroup();
- Map<Integer, MetadataException> failingMeasurementMap =
+ final List<TSStatus> failingStatus = new ArrayList<>();
+ final List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+ final MeasurementGroup measurementGroup = node.getMeasurementGroup();
+ final Map<Integer, MetadataException> failingMeasurementMap =
schemaRegion.checkMeasurementExistence(
node.getDevicePath(),
measurementGroup.getMeasurements(),
measurementGroup.getAliasList());
MetadataException metadataException;
// filter failed measurement and keep the rest for execution
- for (Map.Entry<Integer, MetadataException> failingMeasurement :
+ for (final Map.Entry<Integer, MetadataException> failingMeasurement :
failingMeasurementMap.entrySet()) {
metadataException = failingMeasurement.getValue();
if (metadataException.getErrorCode()
@@ -592,7 +592,7 @@ public class RegionWriteExecutor {
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
- int errorCode = metadataException.getErrorCode();
+ final int errorCode = metadataException.getErrorCode();
if (errorCode != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
|| errorCode !=
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
LOGGER.warn(METADATA_ERROR_MSG, metadataException);
@@ -605,10 +605,7 @@ public class RegionWriteExecutor {
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
return processExecutionResultOfInternalCreateSchema(
- receivedFromPipe
- ? super.visitPipeEnrichedWritePlanNode(
- new PipeEnrichedWritePlanNode(node), context)
- : super.visitInternalCreateTimeSeries(node, context),
+ super.visitInternalCreateTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
@@ -623,7 +620,7 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext
context) {
+ final InternalCreateMultiTimeSeriesNode node, final
WritePlanNodeExecutionContext context) {
return executeInternalCreateMultiTimeSeries(node, context, false);
}
@@ -643,56 +640,49 @@ public class RegionWriteExecutor {
return result;
}
}
- if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+ && !receivedFromPipe) {
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
final List<TSStatus> failingStatus = new ArrayList<>();
final List<TSStatus> alreadyExistingStatus = new ArrayList<>();
- // Do not filter measurements if the node is generated by pipe
- // Because pipe may use the InternalCreateMultiTimeSeriesNode to
transfer historical data
- // And the alias/tags/attributes may need to be updated for existing
time series
- if (!receivedFromPipe) {
- MeasurementGroup measurementGroup;
- Map<Integer, MetadataException> failingMeasurementMap;
- MetadataException metadataException;
- for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
- node.getDeviceMap().entrySet()) {
- measurementGroup = deviceEntry.getValue().right;
- failingMeasurementMap =
- schemaRegion.checkMeasurementExistence(
- deviceEntry.getKey(),
- measurementGroup.getMeasurements(),
- measurementGroup.getAliasList());
- // filter failed measurement and keep the rest for execution
- for (final Map.Entry<Integer, MetadataException>
failingMeasurement :
- failingMeasurementMap.entrySet()) {
- metadataException = failingMeasurement.getValue();
- if (metadataException.getErrorCode()
- == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- // There's no need to internal create time series.
- alreadyExistingStatus.add(
- RpcUtils.getStatus(
- metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
- ((MeasurementAlreadyExistException)
metadataException)
- .getMeasurementPath())));
- } else {
- LOGGER.warn(METADATA_ERROR_MSG, metadataException);
- failingStatus.add(
- RpcUtils.getStatus(
- metadataException.getErrorCode(),
metadataException.getMessage()));
- }
+ MeasurementGroup measurementGroup;
+ Map<Integer, MetadataException> failingMeasurementMap;
+ MetadataException metadataException;
+ for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
+ node.getDeviceMap().entrySet()) {
+ measurementGroup = deviceEntry.getValue().right;
+ failingMeasurementMap =
+ schemaRegion.checkMeasurementExistence(
+ deviceEntry.getKey(),
+ measurementGroup.getMeasurements(),
+ measurementGroup.getAliasList());
+ // filter failed measurement and keep the rest for execution
+ for (final Map.Entry<Integer, MetadataException>
failingMeasurement :
+ failingMeasurementMap.entrySet()) {
+ metadataException = failingMeasurement.getValue();
+ if (metadataException.getErrorCode()
+ == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ // There's no need to internal create time series.
+ alreadyExistingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
+ MeasurementPath.transformDataToString(
+ ((MeasurementAlreadyExistException)
metadataException)
+ .getMeasurementPath())));
+ } else {
+ LOGGER.warn(METADATA_ERROR_MSG, metadataException);
+ failingStatus.add(
+ RpcUtils.getStatus(
+ metadataException.getErrorCode(),
metadataException.getMessage()));
}
-
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}
+
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}
return processExecutionResultOfInternalCreateSchema(
- receivedFromPipe
- ? super.visitPipeEnrichedWritePlanNode(
- new PipeEnrichedWritePlanNode(node), context)
- : super.visitInternalCreateMultiTimeSeries(node, context),
+ super.visitInternalCreateMultiTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
@@ -938,9 +928,9 @@ public class RegionWriteExecutor {
}
private RegionExecutionResult executeCreateLogicalView(
- CreateLogicalViewNode node,
- WritePlanNodeExecutionContext context,
- boolean receivedFromPipe) {
+ final CreateLogicalViewNode node,
+ final WritePlanNodeExecutionContext context,
+ final boolean receivedFromPipe) {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
if
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{