This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch fast_write_test_0423
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_0423 by this
push:
new 2501b0ba9c add schema fill
2501b0ba9c is described below
commit 2501b0ba9c3dc68f8e388431f3ef4efe9e948b31
Author: MarcosZyk <[email protected]>
AuthorDate: Mon Apr 24 10:55:31 2023 +0800
add schema fill
---
.../iotdb/db/metadata/template/Template.java | 22 ++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 79 ++++++++++++++++++++++
.../plan/analyze/schema/ISchemaComputation.java | 2 +
.../db/mpp/plan/analyze/schema/ISchemaFetcher.java | 6 ++
.../mpp/plan/analyze/schema/SchemaValidator.java | 18 ++++-
.../planner/plan/node/write/FastInsertRowNode.java | 25 +++++++
6 files changed, 149 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
index 9cf66897e0..3810d68b61 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -34,6 +34,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +47,8 @@ public class Template implements Serializable {
private boolean isDirectAligned;
private Map<String, IMeasurementSchema> schemaMap;
+ private List<String> measurementsInorder;
+
private transient int rehashCode;
public Template() {
@@ -72,11 +76,13 @@ public class Template implements Serializable {
this.isDirectAligned = isAligned;
this.schemaMap = new ConcurrentHashMap<>();
this.name = name;
+ this.measurementsInorder = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < measurements.size(); i++) {
IMeasurementSchema schema =
new MeasurementSchema(
measurements.get(i), dataTypes.get(i), encodings.get(i),
compressors.get(i));
schemaMap.put(schema.getMeasurementId(), schema);
+ measurementsInorder.add(measurements.get(i));
}
}
@@ -144,6 +150,7 @@ public class Template implements Serializable {
IMeasurementSchema schema =
constructSchema(measurements[i], dataTypes[i], encodings[i],
compressors[i]);
schemaMap.put(measurements[i], schema);
+ measurementsInorder.add(measurements[i]);
}
}
@@ -153,6 +160,11 @@ public class Template implements Serializable {
TSEncoding encoding,
CompressionType compressionType) {
schemaMap.put(measurement, constructSchema(measurement, dataType,
encoding, compressionType));
+ measurementsInorder.add(measurement);
+ }
+
+ public List<String> getMeasurementsInorder() {
+ return measurementsInorder;
}
// endregion
@@ -166,6 +178,9 @@ public class Template implements Serializable {
ReadWriteIOUtils.write(entry.getKey(), buffer);
entry.getValue().partialSerializeTo(buffer);
}
+ for (String measurement : measurementsInorder) {
+ ReadWriteIOUtils.write(measurement, buffer);
+ }
}
public void serialize(OutputStream outputStream) throws IOException {
@@ -177,6 +192,9 @@ public class Template implements Serializable {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().partialSerializeTo(outputStream);
}
+ for (String measurement : measurementsInorder) {
+ ReadWriteIOUtils.write(measurement, outputStream);
+ }
}
public ByteBuffer serialize() {
@@ -206,6 +224,10 @@ public class Template implements Serializable {
}
schemaMap.put(schemaName, measurementSchema);
}
+ measurementsInorder = Collections.synchronizedList(new ArrayList<>());
+ for (int i = 0; i < schemaSize; i++) {
+ measurementsInorder.add(ReadWriteIOUtils.readString(buffer));
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 18a860f2d4..57747c1829 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.cache.DataNodeTemplateSchemaCache;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
@@ -361,6 +362,84 @@ public class ClusterSchemaFetcher implements
ISchemaFetcher {
}
}
+ @Override
+ public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList) {
+ // The schema cache R/W and fetch operation must be locked together thus
the cache clean
+ // operation executed by delete timeseries will be effective.
+ schemaCache.takeReadLock();
+ templateSchemaCache.takeReadLock();
+ try {
+ List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList =
new ArrayList<>();
+ List<Pair<Template, PartialPath>> templateSetInfoList = new
ArrayList<>();
+ Pair<Template, PartialPath> templateSetInfo;
+ for (ISchemaComputationWithAutoCreation
schemaComputationWithAutoCreation :
+ schemaComputationWithAutoCreationList) {
+ templateSetInfo =
+
templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+ if (templateSetInfo == null) {
+ throw new SemanticException(
+ "There's no template on prefix path of"
+ + schemaComputationWithAutoCreation.getDevicePath());
+ }
+ templateTimeSeriesRequestList.add(schemaComputationWithAutoCreation);
+ templateSetInfoList.add(templateSetInfo);
+
+ schemaComputationWithAutoCreation.initMeasurementSchemaContainer(
+ templateSetInfo.left.getMeasurementNumber(),
+ templateSetInfo.getLeft().getMeasurementsInorder().toArray(new
String[0]));
+ }
+
+ if (!templateTimeSeriesRequestList.isEmpty()) {
+ templateSchemaFetcher.processTemplateTimeSeries(
+ templateSetInfoList, templateTimeSeriesRequestList);
+ }
+ } finally {
+ schemaCache.releaseReadLock();
+ templateSchemaCache.releaseReadLock();
+ }
+ }
+
+ @Override
+ public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+ schemaCache.takeReadLock();
+ templateSchemaCache.takeReadLock();
+ try {
+
+ Pair<Template, PartialPath> templateSetInfo =
+
templateManager.checkTemplateSetInfo(schemaComputationWithAutoCreation.getDevicePath());
+ List<Integer> indexOfMissingMeasurements;
+ if (templateSetInfo == null) {
+ throw new SemanticException(
+ "There's no template on prefix path of"
+ + schemaComputationWithAutoCreation.getDevicePath());
+ }
+
+ schemaComputationWithAutoCreation.initMeasurementSchemaContainer(
+ templateSetInfo.left.getMeasurementNumber(),
+ templateSetInfo.getLeft().getMeasurementsInorder().toArray(new
String[0]));
+
+ // template timeseries
+ indexOfMissingMeasurements =
+ templateSchemaFetcher.processTemplateTimeSeries(
+ templateSetInfo, schemaComputationWithAutoCreation);
+
+ // all schema has been taken and processed
+ if (indexOfMissingMeasurements.isEmpty()) {
+ return;
+ }
+
+ // offer null for the rest missing schema processing
+ for (int index : indexOfMissingMeasurements) {
+ schemaComputationWithAutoCreation.computeMeasurement(index, null);
+ }
+ } finally {
+ schemaCache.releaseReadLock();
+ templateSchemaCache.releaseReadLock();
+ }
+ }
+
@Override
public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath
devicePath) {
return templateManager.checkTemplateSetInfo(devicePath);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
index 8fdc9a9fcc..c57f35dc00 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaComputation.java
@@ -40,4 +40,6 @@ public interface ISchemaComputation {
* @param measurementSchemaInfo the measurement schema of fetched measurement
*/
void computeMeasurement(int index, IMeasurementSchemaInfo
measurementSchemaInfo);
+
+ default void initMeasurementSchemaContainer(int size, String[] measurements)
{}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 450c4dc3b0..3a76e06e2d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -76,6 +76,12 @@ public interface ISchemaFetcher {
void fetchAndComputeSchemaWithAutoCreate(
List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList);
+ default void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {}
+
+ default void fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList) {}
+
ISchemaTree fetchSchemaListWithAutoCreate(
List<PartialPath> devicePath,
List<String[]> measurements,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 6cbd587a00..2d6b10ec94 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.FastInsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -38,10 +40,20 @@ public class SchemaValidator {
public static void validate(InsertNode insertNode) {
try {
if (insertNode instanceof BatchInsertNode) {
- SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
- ((BatchInsertNode) insertNode).getSchemaValidationList());
+ if (insertNode instanceof FastInsertRowsNode) {
+ SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ ((BatchInsertNode) insertNode).getSchemaValidationList());
+ } else {
+ SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
+ ((BatchInsertNode) insertNode).getSchemaValidationList());
+ }
} else {
-
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+ if (insertNode instanceof FastInsertRowNode) {
+ SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreateForFastWrite(
+ insertNode.getSchemaValidation());
+ } else {
+
SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+ }
}
insertNode.updateAfterSchemaValidation();
} catch (QueryProcessException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
index 68bddffcac..80ea5de6a4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/FastInsertRowNode.java
@@ -21,10 +21,14 @@ package
org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -106,4 +110,25 @@ public class FastInsertRowNode extends InsertRowNode {
byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, length);
this.rawValues = ByteBuffer.wrap(bytes);
}
+
+ @Override
+ public void initMeasurementSchemaContainer(int size, String[] measurements) {
+ this.measurementSchemas = new MeasurementSchema[size];
+ this.measurements = measurements;
+ this.dataTypes = new TSDataType[size];
+ }
+
+ @Override
+ public void validateDeviceSchema(boolean isAligned) {
+ this.isAligned = isAligned;
+ }
+
+ @Override
+ public void updateAfterSchemaValidation() throws QueryProcessException {}
+
+ @Override
+ public void validateMeasurementSchema(int index, IMeasurementSchemaInfo
measurementSchemaInfo) {
+ measurementSchemas[index] = measurementSchemaInfo.getSchema();
+ this.dataTypes[index] = measurementSchemaInfo.getSchema().getType();
+ }
}