This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 707ab89e04a fix missing measurement
707ab89e04a is described below
commit 707ab89e04a387a242b9b09ad5f208b9975effdb
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 11 17:51:50 2024 +0800
fix missing measurement
---
.../iotdb/confignode/manager/ConfigManager.java | 88 +++++++++++++---------
.../apache/iotdb/confignode/manager/IManager.java | 7 ++
.../thrift/ConfigNodeRPCServiceProcessor.java | 10 ++-
.../request/ConfigPhysicalPlanSerDeTest.java | 1 +
.../plan/analyze/ClusterPartitionFetcher.java | 38 ++++++----
.../distribute/DistributedPlanGenerator.java | 4 +-
.../relational/sql/ast/WrappedInsertStatement.java | 19 ++++-
.../plan/statement/crud/InsertBaseStatement.java | 7 ++
.../attribute/DeviceAttributeStore.java | 4 +-
.../db/schemaengine/table/DataNodeTableCache.java | 9 +++
.../org/apache/iotdb/db/utils/CommonUtils.java | 6 --
.../iotdb/commons/partition/DataPartition.java | 4 +-
.../org/apache/iotdb/commons/utils/PathUtils.java | 7 ++
13 files changed, 139 insertions(+), 65 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index b3a91e4aa6a..7bc4d725ed8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -701,10 +701,10 @@ public class ConfigManager implements IManager {
@Override
public TSchemaPartitionTableResp getSchemaPartition(PathPatternTree
patternTree) {
// Construct empty response
- TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
return resp.setStatus(status);
}
@@ -737,21 +737,9 @@ public class ConfigManager implements IManager {
}
}
- // Return empty resp if the partitionSlotsMap is empty
- if (partitionSlotsMap.isEmpty()) {
- return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new
HashMap<>());
- }
-
- GetSchemaPartitionPlan getSchemaPartitionPlan =
- new GetSchemaPartitionPlan(
- partitionSlotsMap.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> new
ArrayList<>(e.getValue()))));
- SchemaPartitionResp queryResult =
partitionManager.getSchemaPartition(getSchemaPartitionPlan);
- resp = queryResult.convertToRpcSchemaPartitionTableResp();
-
- LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}",
relatedPaths, resp);
-
- return resp;
+ Map<String, List<TSeriesPartitionSlot>> databaseSlotMap = new HashMap<>();
+ partitionSlotsMap.forEach((db, slots) -> databaseSlotMap.put(db, new
ArrayList<>(slots)));
+ return getSchemaPartition(databaseSlotMap);
}
@Override
@@ -778,11 +766,11 @@ public class ConfigManager implements IManager {
@Override
public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree
patternTree) {
- // Construct empty response
- TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Construct empty response
+ TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
return resp.setStatus(status);
}
@@ -790,40 +778,44 @@ public class ConfigManager implements IManager {
List<String> databases = getClusterSchemaManager().getDatabaseNames();
// Build GetOrCreateSchemaPartitionPlan
- Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new
HashMap<>();
+ Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
for (IDeviceID deviceID : devicePaths) {
for (String database : databases) {
if (PathUtils.isStartWith(deviceID, database)) {
partitionSlotsMap
- .computeIfAbsent(database, key -> new ArrayList<>())
+ .computeIfAbsent(database, key -> new HashSet<>())
.add(getPartitionManager().getSeriesPartitionSlot(deviceID));
break;
}
}
}
+
+ Map<String, List<TSeriesPartitionSlot>> partitionSlotListMap = new
HashMap<>();
+ partitionSlotsMap.forEach((db, slots) -> partitionSlotListMap.put(db, new
ArrayList<>(slots)));
+ return getOrCreateSchemaPartition(partitionSlotListMap);
+ }
+
+ @Override
+ public TSchemaPartitionTableResp getOrCreateSchemaPartition(
+ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) {
+ // Construct empty response
+ TSchemaPartitionTableResp resp;
GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
- new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
+ new GetOrCreateSchemaPartitionPlan(dbSlotMap);
SchemaPartitionResp queryResult =
partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
resp = queryResult.convertToRpcSchemaPartitionTableResp();
if (CONF.isEnablePrintingNewlyCreatedPartition()) {
- printNewCreatedSchemaPartition(devicePaths, resp);
+ printNewCreatedSchemaPartition(dbSlotMap, resp);
}
return resp;
}
- private void printNewCreatedSchemaPartition(
- List<IDeviceID> deviceIDS, TSchemaPartitionTableResp resp) {
+ private String partitionTableRespToString(TSchemaPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
- StringBuilder devicePathString = new StringBuilder("{");
- for (IDeviceID deviceID : deviceIDS) {
-
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
- }
- devicePathString.append(lineSeparator).append("}");
-
StringBuilder schemaPartitionRespString = new StringBuilder("{");
schemaPartitionRespString
.append(lineSeparator)
@@ -853,11 +845,37 @@ public class ConfigManager implements IManager {
schemaPartitionRespString.append(lineSeparator).append("\t},");
}
schemaPartitionRespString.append(lineSeparator).append("}");
- LOGGER.debug(
- "[GetOrCreateSchemaPartition]:{}Receive PathPatternTree: {}, Return
TSchemaPartitionTableResp: {}",
- lineSeparator,
- devicePathString,
- schemaPartitionRespString);
+ return schemaPartitionRespString.toString();
+ }
+
+ private void printNewCreatedSchemaPartition(
+ Map<String, List<TSeriesPartitionSlot>> databaseNameSlotMap,
TSchemaPartitionTableResp resp) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "[GetOrCreateSchemaPartition]:{}Receive databaseNameSlotMap: {},
Return TSchemaPartitionTableResp: {}",
+ System.lineSeparator(),
+ databaseNameSlotMap,
+ partitionTableRespToString(resp));
+ }
+ }
+
+ private void printNewCreatedSchemaPartition(
+ List<IDeviceID> deviceIDS, TSchemaPartitionTableResp resp) {
+ final String lineSeparator = System.lineSeparator();
+ StringBuilder devicePathString = new StringBuilder("{");
+ for (IDeviceID deviceID : deviceIDS) {
+
devicePathString.append(lineSeparator).append("\t").append(deviceID).append(",");
+ }
+ devicePathString.append(lineSeparator).append("}");
+
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "[GetOrCreateSchemaPartition]:{}Receive PathPatternTree: {}, Return
TSchemaPartitionTableResp: {}",
+ lineSeparator,
+ devicePathString,
+ partitionTableRespToString(resp));
+ }
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index c280f0fa2a8..fb3804d1bef 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -371,6 +371,13 @@ public interface IManager {
*/
TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree
patternTree);
+ /**
+ * Get or create SchemaPartition with <databaseName, seriesSlot>.
+ *
+ * @return TSchemaPartitionResp
+ */
+ TSchemaPartitionTableResp getOrCreateSchemaPartition(Map<String,
List<TSeriesPartitionSlot>> dbSlotMap);
+
/**
* Create SchemaNodeManagementPartition for child paths node management.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index df2445c67cf..37307c2440f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -525,6 +525,12 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.getSchemaPartition(patternTree);
}
+ @Override
+ public TSchemaPartitionTableResp getSchemaPartitionTableWithSlots(
+ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) {
+ return configManager.getSchemaPartition(dbSlotMap);
+ }
+
@Override
public TSchemaPartitionTableResp
getOrCreateSchemaPartitionTable(TSchemaPartitionReq req) {
PathPatternTree patternTree =
@@ -534,8 +540,8 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSchemaPartitionTableResp getOrCreateSchemaPartitionTableWithSlots(
- Map<String, TSeriesPartitionSlot> dbSlotMap) throws TException {
- return configManager.geto;
+ Map<String, List<TSeriesPartitionSlot>> dbSlotMap) {
+ return configManager.getOrCreateSchemaPartition(dbSlotMap);
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index cfa372be8dd..def1c49f188 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.consensus.request;
+import java.util.Collection;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index f3e634b054f..a1942da7701 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
+import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -26,15 +27,14 @@ import
org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -65,8 +65,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-
public class ClusterPartitionFetcher implements IPartitionFetcher {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -314,13 +312,12 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(Collections.singletonMap(database,
deviceIDs));
if (null == schemaPartition) {
- PathPatternTree tree = new PathPatternTree();
-// tree.appendPathPattern(new PartialPath(database + "." +
MULTI_LEVEL_PATH_WILDCARD));
- for (IDeviceID deviceID : deviceIDs) {
- tree.appendPathPattern(new PartialPath(deviceID.toString()));
- }
+
+ List<TSeriesPartitionSlot> partitionSlots = deviceIDs.stream()
+ .map(partitionExecutor::getSeriesPartitionSlot).distinct().collect(
+ Collectors.toList());
TSchemaPartitionTableResp schemaPartitionTableResp =
-
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(tree));
+
client.getOrCreateSchemaPartitionTableWithSlots(Collections.singletonMap(database,
partitionSlots));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition =
parseSchemaPartitionTableResp(schemaPartitionTableResp);
@@ -334,7 +331,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
return schemaPartition;
- } catch (ClientManagerException | TException | IllegalPathException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" +
e.getMessage());
}
@@ -354,13 +351,22 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
deviceIDs.add(dataPartitionQueryParam.getDeviceID());
}
- Map<IDeviceID, String> deviceToDatabase =
- partitionCache.getDeviceToDatabase(deviceIDs, true, isAutoCreate,
userName);
+ Map<IDeviceID, String> deviceToDatabase = null;
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
IDeviceID deviceID = dataPartitionQueryParam.getDeviceID();
- if (deviceToDatabase.containsKey(deviceID)) {
- String database = deviceToDatabase.get(deviceID);
+ String database = null;
+ if (dataPartitionQueryParam.getDatabaseName() == null) {
+ if (deviceToDatabase == null) {
+ deviceToDatabase = partitionCache.getDeviceToDatabase(deviceIDs,
true, isAutoCreate, userName);
+ }
+ if (deviceToDatabase.containsKey(deviceID)) {
+ database = deviceToDatabase.get(deviceID);
+ }
+ } else {
+ database = dataPartitionQueryParam.getDatabaseName();
+ }
+ if (database != null) {
result.computeIfAbsent(database, key -> new
ArrayList<>()).add(dataPartitionQueryParam);
}
}
@@ -434,7 +440,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
k,
new TTimeSlotList(
new ArrayList<>(v.timeSlotList), v.needLeftAll,
v.needRightAll)));
- partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+ partitionSlotsMap.put(PathUtils.qualifyDatabaseName(entry.getKey()),
deviceToTimePartitionMap);
}
return new TDataPartitionReq(partitionSlotsMap);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 65041da0c69..c1c40d74fc1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -15,6 +15,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
@@ -56,7 +57,6 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.iotdb.db.utils.CommonUtils;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
@@ -414,7 +414,7 @@ public class DistributedPlanGenerator
SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
String database = ((TableDeviceSourceNode)
node.getChildren().get(0)).getDatabase();
- database = CommonUtils.qualifyDatabaseName(database);
+ database = PathUtils.qualifyDatabaseName(database);
Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index f7e7b1e0c6a..2eeb7b08c7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,7 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+import java.util.Arrays;
+import java.util.Collections;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -29,12 +35,16 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.TypeFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.tsfile.write.schema.MeasurementSchema;
public abstract class WrappedInsertStatement extends WrappedStatement
implements ITableDeviceSchemaValidation {
@@ -93,7 +103,9 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
}
/**
- * Adjust the order of ID columns in this insertion to be consistent with
that from the schema region.
+ * Adjust the order of ID columns in this insertion to be consistent with
that from the schema
+ * region.
+ *
* @param realColumnSchemas column order from the schema region
*/
public void adjustIdColumns(List<ColumnSchema> realColumnSchemas) {
@@ -154,5 +166,10 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
"Inconsistent column category of column %s: %s/%s",
incoming.getName(), incoming.getColumnCategory(),
real.getColumnCategory()));
}
+ TSDataType tsDataType = InternalTypeManager.getTSDataType(
+ real.getType());
+ MeasurementSchema measurementSchema = new
MeasurementSchema(real.getName(), tsDataType,
+ getDefaultEncoding(tsDataType),
TSFileDescriptor.getInstance().getConfig().getCompressor());
+ innerTreeStatement.setMeasurementSchema(measurementSchema, i);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index e1eda31d7b5..d4fd0356f1c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -122,6 +122,13 @@ public abstract class InsertBaseStatement extends
Statement {
this.measurementSchemas = measurementSchemas;
}
+ public void setMeasurementSchema(MeasurementSchema measurementSchema, int i)
{
+ if (measurementSchemas == null) {
+ measurementSchemas = new MeasurementSchema[measurements.length];
+ }
+ measurementSchemas[i] = measurementSchema;
+ }
+
public boolean isAligned() {
return isAligned;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
index 457b48e0a6c..90a6a156c1f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/DeviceAttributeStore.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.schemaengine.rescon.MemSchemaRegionStatistics;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +125,7 @@ public class DeviceAttributeStore implements
IDeviceAttributeStore {
Map<String, String> attributeMap = new HashMap<>();
String value;
for (int i = 0; i < nameList.size(); i++) {
- value = valueList[i] == null ? null : (String) valueList[i];
+ value = valueList[i] == null ? null : ((Binary)
valueList[i]).getStringValue(TSFileConfig.STRING_CHARSET);
if (value != null) {
attributeMap.put(nameList.get(i), value);
memUsage += MemUsageUtil.computeKVMemUsageInMap(nameList.get(i),
value);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index 3aa287c8f5b..eac20b21ebf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +99,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void preCreateTable(String database, TsTable table) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
preCreateTableMap
@@ -111,6 +113,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void rollbackCreateTable(String database, String tableName) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
removeTableFromPreCreateMap(database, tableName);
@@ -138,6 +141,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void commitCreateTable(String database, String tableName) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
TsTable table = preCreateTableMap.get(database).get(tableName);
@@ -154,6 +158,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void preAddTableColumn(
String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
preAddColumnMap
@@ -168,6 +173,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void commitAddTableColumn(
String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
TsTable table = databaseTableMap.get(database).get(tableName);
@@ -192,6 +198,7 @@ public class DataNodeTableCache implements ITableCache {
@Override
public void rollbackAddColumn(
String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.writeLock().lock();
try {
preAddColumnMap.compute(
@@ -224,6 +231,7 @@ public class DataNodeTableCache implements ITableCache {
}
public TsTable getTable(String database, String tableName) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.readLock().lock();
try {
if (databaseTableMap.containsKey(database)) {
@@ -236,6 +244,7 @@ public class DataNodeTableCache implements ITableCache {
}
public Optional<List<TsTable>> getTables(String database) {
+ database = PathUtils.qualifyDatabaseName(database);
readWriteLock.readLock().lock();
try {
Map<String, TsTable> tableMap = databaseTableMap.get(database);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 26e04803f5a..9a3308d912b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -475,10 +475,4 @@ public class CommonUtils {
array[j] = tmp;
}
- public static String qualifyDatabaseName(String databaseName) {
- if (databaseName != null && !databaseName.startsWith("root.")) {
- databaseName = "root." + databaseName;
- }
- return databaseName;
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index efeadf14515..b13932da63d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -196,8 +197,7 @@ public class DataPartition extends Partition {
// more than 1 Regions for one timeSlot
List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
- dataBasePartitionMap = dataPartitionMap.get(databaseName);
- LOGGER.info("DataPartitionMap {} and database name {}", dataPartitionMap,
databaseName);
+ dataBasePartitionMap =
dataPartitionMap.get(PathUtils.qualifyDatabaseName(databaseName));
Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap =
dataBasePartitionMap.get(seriesPartitionSlot);
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
index 5b733f248c1..bc2efa7b836 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/PathUtils.java
@@ -204,4 +204,11 @@ public class PathUtils {
}
return src.length() == (src.replace("``", "").length() + num);
}
+
+ public static String qualifyDatabaseName(String databaseName) {
+ if (databaseName != null && !databaseName.startsWith("root.")) {
+ databaseName = "root." + databaseName;
+ }
+ return databaseName;
+ }
}