This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cluster_premerge2 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 78f8d9bfc17c2483da08b59b59708ebe1f2a86c1 Author: jt2594838 <[email protected]> AuthorDate: Mon Jun 8 16:43:57 2020 +0800 next premerge for the distributed version --- cli/src/assembly/resources/sbin/start-cli.sh | 2 +- .../java/org/apache/iotdb/cli/AbstractCli.java | 7 ++ .../iotdb/db/auth/authorizer/BasicAuthorizer.java | 2 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 15 +++ .../engine/storagegroup/StorageGroupProcessor.java | 25 +++- .../db/engine/storagegroup/TsFileResource.java | 10 +- .../version/SimpleFileVersionController.java | 2 +- .../engine/version/SysTimeVersionController.java | 2 + .../iotdb/db/engine/version/VersionController.java | 2 + .../org/apache/iotdb/db/metadata/MManager.java | 45 +++++-- .../java/org/apache/iotdb/db/metadata/MTree.java | 18 ++- .../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +- .../iotdb/db/qp/physical/crud/InsertPlan.java | 38 ++++-- .../db/qp/physical/crud/InsertTabletPlan.java | 138 ++++++++++----------- .../db/qp/physical/sys/ShowTimeSeriesPlan.java | 8 ++ .../iotdb/db/qp/strategy/LogicalGenerator.java | 1 + .../query/dataset/groupby/GroupByFillDataSet.java | 2 +- .../iotdb/db/query/executor/LastQueryExecutor.java | 26 ++-- .../iotdb/db/query/executor/QueryRouter.java | 7 +- .../db/query/executor/RawDataQueryExecutor.java | 2 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 22 ++-- .../org/apache/iotdb/db/utils/CommonUtils.java | 9 +- .../org/apache/iotdb/db/utils/SchemaUtils.java | 7 +- .../org/apache/iotdb/db/utils/SerializeUtils.java | 19 ++- .../apache/iotdb/db/utils/TypeInferenceUtils.java | 4 +- .../apache/iotdb/db/qp/plan/SerializationTest.java | 88 +++++++++++++ .../org/apache/iotdb/db/tools/WalCheckerTest.java | 2 +- service-rpc/src/main/thrift/cluster.thrift | 34 ++++- .../iotdb/tsfile/utils/ReadWriteIOUtils.java | 12 +- .../tsfile/write/schema/MeasurementSchema.java | 3 + ...easurementSchema.java => TimeseriesSchema.java} | 92 +++++++------- 31 files changed, 453 insertions(+), 195 deletions(-) diff --git a/cli/src/assembly/resources/sbin/start-cli.sh b/cli/src/assembly/resources/sbin/start-cli.sh index 3d02904..45bc03c 100644 --- a/cli/src/assembly/resources/sbin/start-cli.sh +++ b/cli/src/assembly/resources/sbin/start-cli.sh @@ -29,7 +29,7 @@ fi MAIN_CLASS=org.apache.iotdb.cli.Cli -CLASSPATH="" +CLASSPATH="." for f in ${IOTDB_CLI_HOME}/lib/*.jar; do CLASSPATH=${CLASSPATH}":"$f done diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index 36df8f8..c9025e7 100644 --- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -346,6 +346,13 @@ public abstract class AbstractCli { executeCommand.append(args[j]).append(" "); } executeCommand.deleteCharAt(executeCommand.length() - 1); + if (executeCommand.charAt(0) == '\'' || executeCommand.charAt(0) == '\"') { + executeCommand.deleteCharAt(0); + } + if (executeCommand.charAt(executeCommand.length() - 1) == '\'' + || executeCommand.charAt(executeCommand.length() - 1) == '\"') { + executeCommand.deleteCharAt(executeCommand.length() - 1); + } execute = executeCommand.toString(); hasExecuteSQL = true; args = Arrays.copyOfRange(args, 0, index); diff --git a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java index 245e3f8..57f6009 100644 --- a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java +++ b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/BasicAuthorizer.java @@ -18,12 +18,12 @@ */ package org.apache.iotdb.db.auth.authorizer; - import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.entity.PrivilegeType; import org.apache.iotdb.db.auth.entity.Role; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 758c77f..4381f59 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -63,6 +63,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.FilePathUtils; +import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.UpgradeUtils; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -570,6 +571,10 @@ public class StorageEngine implements IService { return timePartitionInterval; } + public static void setTimePartitionInterval(long timePartitionInterval) { + StorageEngine.timePartitionInterval = timePartitionInterval; + } + public static long getTimePartition(long time) { return enablePartition ? time / timePartitionInterval : 0; } @@ -584,4 +589,14 @@ public class StorageEngine implements IService { throws StorageEngineException { getProcessor(storageGroup).setPartitionFileVersionToMax(partitionId, newMaxVersion); } + + @TestOnly + public static void setEnablePartition(boolean enablePartition) { + StorageEngine.enablePartition = enablePartition; + } + + @TestOnly + public static boolean isEnablePartition() { + return enablePartition; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 3c22c38..078aedf 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -265,6 +265,14 @@ public class StorageGroupProcessor { } + private Map<Long, List<TsFileResource>> splitResourcesByPartition(List<TsFileResource> resources) { + Map<Long, List<TsFileResource>> ret = new HashMap<>(); + for (TsFileResource resource : resources) { + ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource); + } + return ret; + } + private void recover() throws StorageGroupProcessorException { logger.info("recover Storage Group {}", storageGroupName); @@ -281,8 +289,16 @@ public class StorageGroupProcessor { List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right; upgradeUnseqFileList.addAll(oldUnseqTsFiles); - recoverSeqFiles(tmpSeqTsFiles); - recoverUnseqFiles(tmpUnseqTsFiles); + // split by partition so that we can find the last file of each partition and decide to + // close it or not + Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(tmpSeqTsFiles); + Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(tmpUnseqTsFiles); + for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) { + recoverSeqFiles(value); + } + for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) { + recoverUnseqFiles(value); + } for (TsFileResource resource : sequenceFileTreeSet) { long partitionNum = resource.getTimePartition(); @@ -810,8 +826,7 @@ public class StorageGroupProcessor { } } - private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) - throws WriteProcessException { + private void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) { MNode node = null; try { MManager manager = MManager.getInstance(); @@ -829,7 +844,7 @@ public class StorageGroupProcessor { } } } catch (MetadataException e) { - throw new WriteProcessException(e); + // skip last cache update if the local MTree does not contain the schema } finally { if (node != null) { ((InternalMNode) node).readUnlock(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 11ed2dd..17846eb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -68,23 +68,23 @@ public class TsFileResource { public static final String RESOURCE_SUFFIX = ".resource"; static final String TEMP_SUFFIX = ".temp"; private static final String CLOSING_SUFFIX = ".closing"; - private static final int INIT_ARRAY_SIZE = 64; + protected static final int INIT_ARRAY_SIZE = 64; /** * start times array. */ - private long[] startTimes; + protected long[] startTimes; /** * end times array. * The values in this array are Long.MIN_VALUE if it's an unsealed sequence tsfile */ - private long[] endTimes; + protected long[] endTimes; /** * device -> index of start times array and end times array */ - private Map<String, Integer> deviceToIndex; + protected Map<String, Integer> deviceToIndex; public TsFileProcessor getProcessor() { return processor; @@ -242,7 +242,7 @@ public class TsFileResource { } } - private void initTimes(long[] times, long defaultTime) { + protected void initTimes(long[] times, long defaultTime) { Arrays.fill(times, defaultTime); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java index 3bc438f..66d5e20 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java @@ -144,7 +144,7 @@ public class SimpleFileVersionController implements VersionController { } else { versionFile = SystemFileFactory.INSTANCE.getFile(directory, FILE_PREFIX + "0"); prevVersion = 0; - new FileOutputStream(versionFile).close(); + versionFile.createNewFile(); } // prevent overlapping in case of failure currVersion = prevVersion + saveInterval; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java index 3e47cd9..233c3a13 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.engine.version; +import java.io.IOException; + /** * SysTimeVersionController uses system timestamp as the version number. */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java index c7faf3e..c982299 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.engine.version; +import java.io.IOException; + /** * VersionController controls the version(a monotonic increasing long) of a FileNode. */ diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 19e3a0a..1fbcdf4 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -71,6 +71,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -657,9 +658,13 @@ public class MManager { * @return A List instance which stores all node at given level */ public List<String> getNodesList(String prefixPath, int nodeLevel) throws MetadataException { + return getNodesList(prefixPath, nodeLevel, null); + } + + public List<String> getNodesList(String prefixPath, int nodeLevel, StorageGroupFilter filter) throws MetadataException { lock.readLock().lock(); try { - return mtree.getNodesList(prefixPath, nodeLevel); + return mtree.getNodesList(prefixPath, nodeLevel, filter); } finally { lock.readLock().unlock(); } @@ -1507,14 +1512,30 @@ public class MManager { } } - public void collectSeries(MNode startingNode, Collection<MeasurementSchema> timeseriesSchemas) { + public void collectTimeseriesSchema(MNode startingNode, Collection<TimeseriesSchema> timeseriesSchemas) { Deque<MNode> nodeDeque = new ArrayDeque<>(); nodeDeque.addLast(startingNode); while (!nodeDeque.isEmpty()) { MNode node = nodeDeque.removeFirst(); if (node instanceof LeafMNode) { MeasurementSchema nodeSchema = ((LeafMNode) node).getSchema(); - timeseriesSchemas.add(new MeasurementSchema(node.getFullPath(), nodeSchema.getType(), + timeseriesSchemas.add(new TimeseriesSchema(node.getFullPath(), nodeSchema.getType(), + nodeSchema.getEncodingType(), nodeSchema.getCompressor())); + } else if (!node.getChildren().isEmpty()) { + nodeDeque.addAll(node.getChildren().values()); + } + } + } + + public void collectMeasurementSchema(MNode startingNode, + Collection<MeasurementSchema> timeseriesSchemas) { + Deque<MNode> nodeDeque = new ArrayDeque<>(); + nodeDeque.addLast(startingNode); + while (!nodeDeque.isEmpty()) { + MNode node = nodeDeque.removeFirst(); + if (node instanceof LeafMNode) { + MeasurementSchema nodeSchema = ((LeafMNode) node).getSchema(); + timeseriesSchemas.add(new MeasurementSchema(node.getName(), nodeSchema.getType(), nodeSchema.getEncodingType(), nodeSchema.getCompressor())); } else if (!node.getChildren().isEmpty()) { nodeDeque.addAll(node.getChildren().values()); @@ -1523,20 +1544,19 @@ public class MManager { } /** - * Collect the timeseries schemas under "startingPath". Notice the measurements in the collected - * MeasurementSchemas are the full path here. + * Collect the timeseries schemas under "startingPath". * * @param startingPath - * @param timeseriesSchemas + * @param measurementSchemas */ - public void collectSeries(String startingPath, List<MeasurementSchema> timeseriesSchemas) { + public void collectSeries(String startingPath, List<MeasurementSchema> measurementSchemas) { MNode mNode; try { mNode = getNodeByPath(startingPath); } catch (MetadataException e) { return; } - collectSeries(mNode, timeseriesSchemas); + collectMeasurementSchema(mNode, measurementSchemas); } /** @@ -1590,4 +1610,13 @@ public class MManager { mRemoteSchemaCache.put(path, schema); } } + + /** + * StorageGroupFilter filters unsatisfied storage groups in metadata queries to speed up and + * deduplicate. + */ + @FunctionalInterface + public interface StorageGroupFilter { + boolean satisfy(String storageGroup); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 6b944ec..28cada3 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; +import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter; import org.apache.iotdb.db.metadata.mnode.InternalMNode; import org.apache.iotdb.db.metadata.mnode.LeafMNode; import org.apache.iotdb.db.metadata.mnode.MNode; @@ -828,6 +829,11 @@ public class MTree implements Serializable { * Get all paths from root to the given level. */ List<String> getNodesList(String path, int nodeLevel) throws MetadataException { + return getNodesList(path, nodeLevel, null); + } + + /** Get all paths from root to the given level */ + List<String> getNodesList(String path, int nodeLevel, StorageGroupFilter filter) throws MetadataException { String[] nodes = MetaUtils.getNodeNames(path); if (!nodes[0].equals(root.getName())) { throw new IllegalPathException(path); @@ -837,11 +843,14 @@ public class MTree implements Serializable { for (int i = 1; i < nodes.length; i++) { if (node.getChild(nodes[i]) != null) { node = node.getChild(nodes[i]); + if (node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) { + return res; + } } else { throw new MetadataException(nodes[i - 1] + " does not have the child node " + nodes[i]); } } - findNodes(node, path, res, nodeLevel - (nodes.length - 1)); + findNodes(node, path, res, nodeLevel - (nodes.length - 1), filter); return res; } @@ -849,8 +858,9 @@ public class MTree implements Serializable { * Get all paths under the given level. * @param targetLevel Record the distance to the target level, 0 means the target level. */ - private void findNodes(MNode node, String path, List<String> res, int targetLevel) { - if (node == null) { + private void findNodes(MNode node, String path, List<String> res, int targetLevel, + StorageGroupFilter filter) { + if (node == null || node instanceof StorageGroupMNode && filter != null && !filter.satisfy(node.getFullPath())) { return; } if (targetLevel == 0) { @@ -859,7 +869,7 @@ public class MTree implements Serializable { } if (node instanceof InternalMNode) { for (MNode child : node.getChildren().values()) { - findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1); + findNodes(child, path + PATH_SEPARATOR + child.toString(), res, targetLevel - 1, filter); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index 468f286..2b9849a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java @@ -45,8 +45,8 @@ public class SQLConstant { public static final String METADATA_PARAM_EQUAL = "="; public static final String QUOTE = "'"; public static final String DQUOTE = "\""; - public static final String BOOLEN_TRUE = "true"; - public static final String BOOLEN_FALSE = "false"; + public static final String BOOLEAN_TRUE = "true"; + public static final String BOOLEAN_FALSE = "false"; public static final String BOOLEAN_TRUE_NUM = "1"; public static final String BOOLEAN_FALSE_NUM = "0"; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 5641f4b..f160293 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; import org.apache.iotdb.db.conf.IoTDBConstant; + import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -285,12 +286,6 @@ public class InsertPlan extends PhysicalPlan { } } - for (MeasurementSchema schema: schemas) { - if (schema != null) { - schema.serializeTo(stream); - } - } - try { putValues(stream); } catch (QueryProcessException e) { @@ -300,9 +295,14 @@ public class InsertPlan extends PhysicalPlan { private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException { for (int i = 0; i < values.length; i++) { - if (types[i] == null) { + // types are not determined, the situation mainly occurs when the plan uses string values + // and is forwarded to other nodes + if (types == null || types[i] == null) { + ReadWriteIOUtils.write((short) -1, outputStream); + ReadWriteIOUtils.write((String) values[i], outputStream); continue; } + ReadWriteIOUtils.write(types[i], outputStream); switch (types[i]) { case BOOLEAN: @@ -331,9 +331,14 @@ public class InsertPlan extends PhysicalPlan { private void putValues(ByteBuffer buffer) throws QueryProcessException { for (int i = 0; i < values.length; i++) { - if (types[i] == null) { + // types are not determined, the situation mainly occurs when the plan uses string values + // and is forwarded to other nodes + if (types == null || types[i] == null) { + ReadWriteIOUtils.write((short) -1, buffer); + ReadWriteIOUtils.write((String) values[i], buffer); continue; } + ReadWriteIOUtils.write(types[i], buffer); switch (types[i]) { case BOOLEAN: @@ -378,7 +383,15 @@ public class InsertPlan extends PhysicalPlan { public void setValues(ByteBuffer buffer) throws QueryProcessException { for (int i = 0; i < measurements.length; i++) { - types[i] = ReadWriteIOUtils.readDataType(buffer); + // types are not determined, the situation mainly occurs when the plan uses string values + // and is forwarded to other nodes + short typeNum = ReadWriteIOUtils.readShort(buffer); + if (typeNum == -1) { + values[i] = ReadWriteIOUtils.readString(buffer); + continue; + } + + types[i] = TSDataType.deserialize(typeNum); switch (types[i]) { case BOOLEAN: values[i] = ReadWriteIOUtils.readBool(buffer); @@ -423,7 +436,7 @@ public class InsertPlan extends PhysicalPlan { try { putValues(buffer); } catch (QueryProcessException e) { - e.printStackTrace(); + logger.warn("Exception in serialization of InsertPlan", e); } } @@ -444,8 +457,11 @@ public class InsertPlan extends PhysicalPlan { try { setValues(buffer); } catch (QueryProcessException e) { - e.printStackTrace(); + logger.warn("Exception in deserialization of InsertPlan", e); } + + // the types are lost and should be re-inferred + this.inferType = true; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java index 8b6a64c..690ead1 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Set; - import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.utils.QueryDataSetUtils; @@ -58,7 +56,6 @@ public class InsertTabletPlan extends PhysicalPlan { private Object[] columns; private ByteBuffer valueBuffer; - private Set<Integer> index; private int rowCount = 0; // cached values private Long maxTime = null; @@ -76,6 +73,7 @@ public class InsertTabletPlan extends PhysicalPlan { this.deviceId = deviceId; setMeasurements(measurements); } + public InsertTabletPlan(String deviceId, String[] measurements) { super(false, OperatorType.BATCHINSERT); this.deviceId = deviceId; @@ -105,14 +103,6 @@ public class InsertTabletPlan extends PhysicalPlan { this.end = end; } - public Set<Integer> getIndex() { - return index; - } - - public void setIndex(Set<Integer> index) { - this.index = index; - } - @Override public List<Path> getPaths() { if (paths != null) { @@ -142,11 +132,11 @@ public class InsertTabletPlan extends PhysicalPlan { stream.writeShort(dataType.serialize()); } - stream.writeInt(index.size()); + stream.writeInt(end - start); if (timeBuffer == null) { - for(int loc : index){ - stream.writeLong(times[loc]); + for (int i = start; i < end; i++) { + stream.writeLong(times[i]); } } else { stream.write(timeBuffer.array()); @@ -161,58 +151,6 @@ public class InsertTabletPlan extends PhysicalPlan { } } - private void serializeValues(DataOutputStream stream) throws IOException { - for (int i = 0; i < measurements.length; i++) { - serializeColumn(dataTypes[i], columns[i], stream, index); - } - } - - private void serializeColumn(TSDataType dataType, Object column, DataOutputStream stream, - Set<Integer> index) - throws IOException { - switch (dataType) { - case INT32: - int[] intValues = (int[]) column; - for(int loc : index){ - stream.writeInt(intValues[loc]); - } - break; - case INT64: - long[] longValues = (long[]) column; - for(int loc : index){ - stream.writeLong(longValues[loc]); - } - break; - case FLOAT: - float[] floatValues = (float[]) column; - for(int loc : index){ - stream.writeFloat(floatValues[loc]); - } - break; - case DOUBLE: - double[] doubleValues = (double[]) column; - for(int loc : index){ - stream.writeDouble(doubleValues[loc]); - } - break; - case BOOLEAN: - boolean[] boolValues = (boolean[]) column; - for(int loc : index){ - stream.write(BytesUtils.boolToByte(boolValues[loc])); - } - break; - case TEXT: - Binary[] binaryValues = (Binary[]) column; - for(int loc : index){ - stream.writeInt(binaryValues[loc].getLength()); - stream.write(binaryValues[loc].getValues()); - } - break; - default: - throw new UnSupportedDataTypeException( - String.format(DATATYPE_UNSUPPORTED, dataType)); - } - } @Override public void serialize(ByteBuffer buffer) { @@ -249,6 +187,12 @@ public class InsertTabletPlan extends PhysicalPlan { } } + private void serializeValues(DataOutputStream outputStream) throws IOException { + for (int i = 0; i < measurements.length; i++) { + serializeColumn(dataTypes[i], columns[i], outputStream, start, end); + } + } + private void serializeValues(ByteBuffer buffer) { for (int i = 0; i < measurements.length; i++) { serializeColumn(dataTypes[i], columns[i], buffer, start, end); @@ -301,6 +245,52 @@ public class InsertTabletPlan extends PhysicalPlan { } } + private void serializeColumn(TSDataType dataType, Object column, DataOutputStream outputStream, + int start, int end) throws IOException { + switch (dataType) { + case INT32: + int[] intValues = (int[]) column; + for (int j = start; j < end; j++) { + outputStream.writeInt(intValues[j]); + } + break; + case INT64: + long[] longValues = (long[]) column; + for (int j = start; j < end; j++) { + outputStream.writeLong(longValues[j]); + } + break; + case FLOAT: + float[] floatValues = (float[]) column; + for (int j = start; j < end; j++) { + outputStream.writeFloat(floatValues[j]); + } + break; + case DOUBLE: + double[] doubleValues = (double[]) column; + for (int j = start; j < end; j++) { + outputStream.writeDouble(doubleValues[j]); + } + break; + case BOOLEAN: + boolean[] boolValues = (boolean[]) column; + for (int j = start; j < end; j++) { + outputStream.writeByte(BytesUtils.boolToByte(boolValues[j])); + } + break; + case TEXT: + Binary[] binaryValues = (Binary[]) column; + for (int j = start; j < end; j++) { + outputStream.writeInt(binaryValues[j].getLength()); + outputStream.write(binaryValues[j].getValues()); + } + break; + default: + throw new UnSupportedDataTypeException( + String.format(DATATYPE_UNSUPPORTED, dataType)); + } + } + public void setTimeBuffer(ByteBuffer timeBuffer) { this.timeBuffer = timeBuffer; this.timeBuffer.position(0); @@ -360,14 +350,6 @@ public class InsertTabletPlan extends PhysicalPlan { return dataTypes; } - public MeasurementSchema[] getSchemas() { - return schemas; - } - - public void setSchemas(MeasurementSchema[] schemas) { - this.schemas = schemas; - } - public void setDataTypes(List<Integer> dataTypes) { this.dataTypes = new TSDataType[dataTypes.size()]; for (int i = 0; i < dataTypes.size(); i++) { @@ -379,6 +361,14 @@ public class InsertTabletPlan extends PhysicalPlan { this.dataTypes = dataTypes; } + public MeasurementSchema[] getSchemas() { + return schemas; + } + + public void setSchemas(MeasurementSchema[] schemas) { + this.schemas = schemas; + } + public Object[] getColumns() { return columns; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java index 92e764c..9802b67 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowTimeSeriesPlan.java @@ -101,4 +101,12 @@ public class ShowTimeSeriesPlan extends ShowPlan { limit = buffer.getInt(); limit = buffer.getInt(); } + + public void setLimit(int limit) { + this.limit = limit; + } + + public void setOffset(int offset) { + this.offset = offset; + } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java index 43e4c4a..d25f0b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator; import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator; import org.apache.iotdb.db.qp.logical.sys.FlushOperator; import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator; +import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType; import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator; import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator; import org.apache.iotdb.db.qp.logical.sys.MergeOperator; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java index 697cd3b..f83237a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java @@ -90,7 +90,7 @@ public class GroupByFillDataSet extends QueryDataSet { lastTimeArray = new long[paths.size()]; Arrays.fill(lastTimeArray, Long.MAX_VALUE); for (int i = 0; i < paths.size(); i++) { - TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeries( + TimeValuePair lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally( paths.get(i), dataTypes.get(i), context, groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice())); if (lastTimeValuePair.getValue() != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java index 7b7fb08..8d7d83d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.LeafMNode; @@ -101,24 +102,33 @@ public class LastQueryExecutor { return dataSet; } + protected TimeValuePair calculateLastPairForOneSeries( + Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements) + throws IOException, QueryProcessException, StorageEngineException { + return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context, + deviceMeasurements); + } + /** * get last result for one series * * @param context query context * @return TimeValuePair */ - public static TimeValuePair calculateLastPairForOneSeries( - Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> sensors) + public static TimeValuePair calculateLastPairForOneSeriesLocally( + Path seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements) throws IOException, QueryProcessException, StorageEngineException { // Retrieve last value from MNode - LeafMNode node; + LeafMNode node = null; try { node = (LeafMNode) MManager.getInstance().getNodeByPath(seriesPath.toString()); + } catch (PathNotExistException e) { + // TODO use last cache for remote series } catch (MetadataException e) { throw new QueryProcessException(e); } - if (node.getCachedLast() != null) { + if (node != null && node.getCachedLast() != null) { return node.getCachedLast(); } @@ -133,7 +143,7 @@ public class LastQueryExecutor { if (!seqFileResources.isEmpty()) { for (int i = seqFileResources.size() - 1; i >= 0; i--) { TimeseriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( - seqFileResources.get(i), seriesPath, context, null, sensors); + seqFileResources.get(i), seriesPath, context, null, deviceMeasurements); if (timeseriesMetadata != null) { if (!timeseriesMetadata.isModified()) { Statistics timeseriesMetadataStats = timeseriesMetadata.getStatistics(); @@ -163,7 +173,7 @@ public class LastQueryExecutor { continue; } TimeseriesMetadata timeseriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, sensors); + FileLoaderUtils.loadTimeSeriesMetadata(resource, seriesPath, context, null, deviceMeasurements); if (timeseriesMetadata != null) { for (ChunkMetadata chunkMetaData : timeseriesMetadata.loadChunkMetadataList()) { if (chunkMetaData.getEndTime() > resultPair.getTimestamp() @@ -180,7 +190,9 @@ public class LastQueryExecutor { } // Update cached last value with low priority - node.updateCachedLast(resultPair, false, Long.MIN_VALUE); + if (node != null) { + node.updateCachedLast(resultPair, false, Long.MIN_VALUE); + } return resultPair; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 4d2070f..98e04a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -217,7 +217,12 @@ public class QueryRouter implements IQueryRouter { @Override public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context) throws StorageEngineException, QueryProcessException, IOException { - LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan); + LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan); return lastQueryExecutor.execute(context, lastQueryPlan); } + + protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) { + return new LastQueryExecutor(lastQueryPlan); + } + } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java index 2a4d63f..9acb3d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java @@ -43,6 +43,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths; diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 7ae24a9..8267115 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -521,7 +521,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } /** - * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some + * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some * AuthorPlan */ private TSExecuteStatementResp internalExecuteQueryStatement(String statement, @@ -952,7 +952,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return new TSExecuteStatementResp(status); } - status = executePlan(plan); + status = executeNonQueryPlan(plan); TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status); long queryId = generateQueryId(false); resp.setQueryId(queryId); @@ -1079,7 +1079,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { resp.addToStatusList(status); } else { - resp.addToStatusList(executePlan(plan)); + resp.addToStatusList(executeNonQueryPlan(plan)); } } catch (Exception e) { logger.error("meet error when insert in batch", e); @@ -1135,7 +1135,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return status; } - return executePlan(plan); + return executeNonQueryPlan(plan); } catch (Exception e) { logger.error("meet error when insert", e); } @@ -1161,7 +1161,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return new TSStatus(status); } - return new TSStatus(executePlan(plan)); + return new TSStatus(executeNonQueryPlan(plan)); } @Override @@ -1285,7 +1285,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return new TSStatus(status); } - return new TSStatus(executePlan(plan)); + return new TSStatus(executeNonQueryPlan(plan)); } @Override @@ -1303,7 +1303,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return new TSStatus(status); } - return new TSStatus(executePlan(plan)); + return new TSStatus(executeNonQueryPlan(plan)); } @Override @@ -1326,7 +1326,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return status; } - return executePlan(plan); + return executeNonQueryPlan(plan); } @Override @@ -1359,7 +1359,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { continue; } - statusList.add(executePlan(plan)); + statusList.add(executeNonQueryPlan(plan)); } boolean isAllSuccessful = true; @@ -1396,7 +1396,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (status != null) { return status; } - return executePlan(plan); + return executeNonQueryPlan(plan); } @Override @@ -1424,7 +1424,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return null; } - protected TSStatus executePlan(PhysicalPlan plan) { + protected TSStatus executeNonQueryPlan(PhysicalPlan plan) { boolean execRet; try { execRet = executeNonQuery(plan); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java index c6575de..00ce1ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java @@ -73,11 +73,10 @@ public class CommonUtils { switch (dataType) { case BOOLEAN: value = value.toLowerCase(); - if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE - .equals(value)) { + if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE.equals(value)) { return false; } - if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) { + if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) { return true; } throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1"); @@ -115,11 +114,11 @@ public class CommonUtils { switch (dataType) { case BOOLEAN: value = value.toLowerCase(); - if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEN_FALSE + if (SQLConstant.BOOLEAN_FALSE_NUM.equals(value) || SQLConstant.BOOLEAN_FALSE .equals(value)) { return false; } - if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEN_TRUE.equals(value)) { + if (SQLConstant.BOOLEAN_TRUE_NUM.equals(value) || SQLConstant.BOOLEAN_TRUE.equals(value)) { return true; } throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1"); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java index fd359e1..d715152 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java @@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +43,10 @@ public class SchemaUtils { private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class); - public static void registerTimeseries(MeasurementSchema schema) { + public static void registerTimeseries(TimeseriesSchema schema) { try { logger.debug("Registering timeseries {}", schema); - String path = schema.getMeasurementId(); + String path = schema.getFullPath(); TSDataType dataType = schema.getType(); TSEncoding encoding = schema.getEncodingType(); CompressionType compressionType = schema.getCompressor(); @@ -54,7 +55,7 @@ public class SchemaUtils { } catch (PathAlreadyExistException ignored) { // ignore added timeseries } catch (MetadataException e) { - logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getMeasurementId(), + logger.error("Cannot create timeseries {} in snapshot, ignored", schema.getFullPath(), e); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java index ce1e755..7141720 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/SerializeUtils.java @@ -208,7 +208,7 @@ public class SerializeUtils { } public static BatchData deserializeBatchData(ByteBuffer buffer) { - if (buffer == null || buffer.limit() == 0) { + if (buffer == null || (buffer.limit() - buffer.position()) == 0) { return null; } @@ -494,4 +494,21 @@ public class SerializeUtils { } return ret; } + + public static Node stringToNode(String str) { + int ipFirstPos = str.indexOf("ip:", 0) + "ip:".length(); + int ipLastPos = str.indexOf(',', ipFirstPos); + int metaPortFirstPos = str.indexOf("metaPort:", ipLastPos) + "metaPort:".length(); + int metaPortLastPos = str.indexOf(',', metaPortFirstPos); + int idFirstPos = str.indexOf("nodeIdentifier:", metaPortLastPos) + "nodeIdentifier:".length(); + int idLastPos = str.indexOf(',', idFirstPos); + int dataPortFirstPos = str.indexOf("dataPort:", idLastPos) + "dataPort:".length(); + int dataPortLastPos = str.indexOf(')', dataPortFirstPos); + + String ip = str.substring(ipFirstPos, ipLastPos); + int metaPort = Integer.parseInt(str.substring(metaPortFirstPos, metaPortLastPos)); + int id = Integer.parseInt(str.substring(idFirstPos, idLastPos)); + int dataPort = Integer.parseInt(str.substring(dataPortFirstPos, dataPortLastPos)); + return new Node(ip, metaPort, id, dataPort); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java index 2e9981d..0f5de79 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java @@ -46,8 +46,8 @@ public class TypeInferenceUtils { } private static boolean isBoolean(String s) { - return s.equalsIgnoreCase(SQLConstant.BOOLEN_TRUE) || s - .equalsIgnoreCase(SQLConstant.BOOLEN_FALSE); + return s.equalsIgnoreCase(SQLConstant.BOOLEAN_TRUE) || s + .equalsIgnoreCase(SQLConstant.BOOLEAN_FALSE); } /** diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java new file mode 100644 index 0000000..9269a52 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/SerializationTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.plan; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SerializationTest { + + private Planner processor = new Planner(); + + @Before + public void before() throws MetadataException { + MManager.getInstance().init(); + MManager.getInstance().setStorageGroup("root.vehicle"); + MManager.getInstance() + .createTimeseries("root.vehicle.d1.s1", TSDataType.FLOAT, TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, null); + MManager.getInstance() + .createTimeseries("root.vehicle.d2.s1", TSDataType.FLOAT, TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, null); + MManager.getInstance() + .createTimeseries("root.vehicle.d3.s1", TSDataType.FLOAT, TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, null); + MManager.getInstance() + .createTimeseries("root.vehicle.d4.s1", TSDataType.FLOAT, TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, null); + } + + @After + public void clean() throws IOException { + MManager.getInstance().clear(); + EnvironmentUtils.cleanAllDir(); + } + + @Test + public void testInsert() throws QueryProcessException, IOException { + String sqlStr = "INSERT INTO root.vehicle.d1(timestamp, s1) VALUES (1, 5.0)"; + PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + plan.serialize(dataOutputStream); + ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); + PhysicalPlan planB = PhysicalPlan.Factory.create(buffer); + assertEquals(plan, planB); + } + + ByteBuffer buffer = ByteBuffer.allocate(4096); + plan.serialize(buffer); + buffer.flip(); + PhysicalPlan planB = PhysicalPlan.Factory.create(buffer); + assertEquals(plan, planB); + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java index 49efed9..b4de634 100644 --- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java @@ -80,7 +80,7 @@ public class WalCheckerTest { TSDataType[] types = new TSDataType[]{TSDataType.INT64, TSDataType.INT64, TSDataType.INT64}; String[] values = new String[]{"5", "6", "7"}; for (int j = 0; j < 10; j++) { - new InsertPlan(deviceId, j, measurements, values).serialize(binaryPlans); + new InsertPlan(deviceId, j, measurements, types, values).serialize(binaryPlans); } binaryPlans.flip(); logWriter.write(binaryPlans); diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift index b59bee8..29af9a5 100644 --- a/service-rpc/src/main/thrift/cluster.thrift +++ b/service-rpc/src/main/thrift/cluster.thrift @@ -120,6 +120,7 @@ struct StartUpStatus { 1: required long partitionInterval 2: required int hashSalt 3: required int replicationNumber + 4: required list<Node> seedNodeList } // follower -> leader @@ -127,6 +128,7 @@ struct CheckStatusResponse { 1: required bool partitionalIntervalEquals 2: required bool hashSaltEquals 3: required bool replicationNumEquals + 4: required bool seedNodeEquals } struct SendSnapshotRequest { @@ -212,6 +214,16 @@ struct GroupByRequest { 8: required set<string> deviceMeasurements } +struct LastRequest { + 1: required string path + 2: required int dataTypeOrdinal + 3: required long queryId + 4: required set<string> deviceMeasurements + 5: required Node header + 6: required Node requestor +} + + service RaftService { /** * Leader will call this method to all followers to ensure its authority. @@ -269,7 +281,18 @@ service RaftService { **/ long requestCommitIndex(1:Node header) + + /** + * Read a chunk of a file from the client. If the remaining of the file does not have enough + * bytes, only the remaining will be returned. + * Notice that when the last chunk of the file is read, the file will be deleted immediately. + **/ binary readFile(1:string filePath, 2:i64 offset, 3:i32 length) + + /** + * Test if a log of "index" and "term" exists. + **/ + bool matchTerm(1:long index, 2:long term, 3:Node header) } @@ -325,7 +348,6 @@ service TSDataService extends RaftService { binary getAllMeasurementSchema(1: Node header, 2: binary planBinary) - list<binary> getAggrResult(1:GetAggrResultRequest request) PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request) @@ -351,8 +373,15 @@ service TSDataService extends RaftService { /** * Perform a previous fill and return the timevalue pair in binary. + * @return a binary TimeValuePair **/ binary previousFill(1: PreviousFillRequest request) + + /** + * Query the last point of the series. + * @return a binary TimeValuePair + **/ + binary last(1: LastRequest request) } service TSMetaService extends RaftService { @@ -366,6 +395,9 @@ service TSMetaService extends RaftService { **/ AddNodeResponse addNode(1: Node node, 2: StartUpStatus startUpStatus) + + CheckStatusResponse checkStatus(1: StartUpStatus startUpStatus) + /** * Remove a node from the cluster. If the node is not in the cluster or the cluster size will * less than replication number, the request will be rejected. diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java index 1173749..d89f917 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java @@ -133,16 +133,16 @@ public class ReadWriteIOUtils { public static int write(Map<String, String> map, DataOutputStream stream) throws IOException { int length = 0; byte[] bytes; - stream.write(map.size()); + stream.writeInt(map.size()); length += 4; for (Entry<String, String> entry : map.entrySet()) { bytes = entry.getKey().getBytes(); - stream.write(bytes.length); + stream.writeInt(bytes.length); length += 4; stream.write(bytes); length += bytes.length; bytes = entry.getValue().getBytes(); - stream.write(bytes.length); + stream.writeInt(bytes.length); length += 4; stream.write(bytes); length += bytes.length; @@ -344,6 +344,9 @@ public class ReadWriteIOUtils { * @return the length of string represented by byte[]. */ public static int write(String s, ByteBuffer buffer) { + if (s == null) { + return write(-1, buffer); + } int len = 0; byte[] bytes = s.getBytes(); len += write(bytes.length, buffer); @@ -563,6 +566,9 @@ public class ReadWriteIOUtils { */ public static String readString(ByteBuffer buffer) { int strLength = readInt(buffer); + if (strLength < 0) { + return null; + } byte[] bytes = new byte[strLength]; buffer.get(bytes, 0, strLength); return new String(bytes, 0, strLength); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java index 848e0b3..0ef47ea 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java @@ -294,4 +294,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali return sc.toString(); } + public void setType(TSDataType type) { + this.type = type; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java similarity index 70% copy from tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java copy to tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java index 848e0b3..31dbf5f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/TimeseriesSchema.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.write.schema; import java.io.IOException; @@ -37,25 +38,22 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.utils.StringContainer; /** - * This class describes a measurement's information registered in {@linkplain Schema FileSchema}, - * including measurement id, data type, encoding and compressor type. For each TSEncoding, - * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has - * TSDataTypeConverter up to now. + * TimeseriesSchema is like MeasurementSchema, but instead of measurementId, it stores the full + * path. */ -public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable { - - private String measurementId; +public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializable { + private String fullPath; private TSDataType type; private TSEncoding encoding; private TSEncodingBuilder encodingConverter; private CompressionType compressor; private Map<String, String> props = new HashMap<>(); - public MeasurementSchema() { + public TimeseriesSchema() { } - public MeasurementSchema(String measurementId, TSDataType tsDataType) { - this(measurementId, tsDataType, + public TimeseriesSchema(String fullPath, TSDataType tsDataType) { + this(fullPath, tsDataType, TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()), TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); @@ -64,27 +62,27 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali /** * set properties as an empty Map. */ - public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding) { - this(measurementId, type, encoding, + public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) { + this(fullPath, type, encoding, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); } - public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, + public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding, CompressionType compressionType) { - this(measurementId, type, encoding, compressionType, Collections.emptyMap()); + this(fullPath, type, encoding, compressionType, Collections.emptyMap()); } /** - * Constructor of MeasurementSchema. + * Constructor of TimeseriesSchema. * * <p>props - information in encoding method. For RLE, Encoder.MAX_POINT_NUMBER For PLAIN, * Encoder.maxStringLength */ - public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, + public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding, CompressionType compressionType, Map<String, String> props) { this.type = type; - this.measurementId = measurementId; + this.fullPath = fullPath; this.encoding = encoding; this.props = props == null ? Collections.emptyMap() : props; this.compressor = compressionType; @@ -93,67 +91,67 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali /** * function for deserializing data from input stream. */ - public static MeasurementSchema deserializeFrom(InputStream inputStream) throws IOException { - MeasurementSchema measurementSchema = new MeasurementSchema(); + public static TimeseriesSchema deserializeFrom(InputStream inputStream) throws IOException { + TimeseriesSchema TimeseriesSchema = new TimeseriesSchema(); - measurementSchema.measurementId = ReadWriteIOUtils.readString(inputStream); + TimeseriesSchema.fullPath = ReadWriteIOUtils.readString(inputStream); - measurementSchema.type = ReadWriteIOUtils.readDataType(inputStream); + TimeseriesSchema.type = ReadWriteIOUtils.readDataType(inputStream); - measurementSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream); + TimeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(inputStream); - measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream); + TimeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(inputStream); int size = ReadWriteIOUtils.readInt(inputStream); if (size > 0) { - measurementSchema.props = new HashMap<>(); + TimeseriesSchema.props = new HashMap<>(); String key; String value; for (int i = 0; i < size; i++) { key = ReadWriteIOUtils.readString(inputStream); value = ReadWriteIOUtils.readString(inputStream); - measurementSchema.props.put(key, value); + TimeseriesSchema.props.put(key, value); } } - return measurementSchema; + return TimeseriesSchema; } /** * function for deserializing data from byte buffer. */ - public static MeasurementSchema deserializeFrom(ByteBuffer buffer) { - MeasurementSchema measurementSchema = new MeasurementSchema(); + public static TimeseriesSchema deserializeFrom(ByteBuffer buffer) { + TimeseriesSchema TimeseriesSchema = new TimeseriesSchema(); - measurementSchema.measurementId = ReadWriteIOUtils.readString(buffer); + TimeseriesSchema.fullPath = ReadWriteIOUtils.readString(buffer); - measurementSchema.type = ReadWriteIOUtils.readDataType(buffer); + TimeseriesSchema.type = ReadWriteIOUtils.readDataType(buffer); - measurementSchema.encoding = ReadWriteIOUtils.readEncoding(buffer); + TimeseriesSchema.encoding = ReadWriteIOUtils.readEncoding(buffer); - measurementSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer); + TimeseriesSchema.compressor = ReadWriteIOUtils.readCompressionType(buffer); int size = ReadWriteIOUtils.readInt(buffer); if (size > 0) { - measurementSchema.props = new HashMap<>(); + TimeseriesSchema.props = new HashMap<>(); String key; String value; for (int i = 0; i < size; i++) { key = ReadWriteIOUtils.readString(buffer); value = ReadWriteIOUtils.readString(buffer); - measurementSchema.props.put(key, value); + TimeseriesSchema.props.put(key, value); } } - return measurementSchema; + return TimeseriesSchema; } - public String getMeasurementId() { - return measurementId; + public String getFullPath() { + return fullPath; } - public void setMeasurementId(String measurementId) { - this.measurementId = measurementId; + public void setFullPath(String fullPath) { + this.fullPath = fullPath; } public Map<String, String> getProps() { @@ -205,7 +203,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali public int serializeTo(OutputStream outputStream) throws IOException { int byteLen = 0; - byteLen += ReadWriteIOUtils.write(measurementId, outputStream); + byteLen += ReadWriteIOUtils.write(fullPath, outputStream); byteLen += ReadWriteIOUtils.write(type, outputStream); @@ -232,7 +230,7 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali public int serializeTo(ByteBuffer buffer) { int byteLen = 0; - byteLen += ReadWriteIOUtils.write(measurementId, buffer); + byteLen += ReadWriteIOUtils.write(fullPath, buffer); byteLen += ReadWriteIOUtils.write(type, buffer); @@ -261,33 +259,33 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali if (o == null || getClass() != o.getClass()) { return false; } - MeasurementSchema that = (MeasurementSchema) o; + TimeseriesSchema that = (TimeseriesSchema) o; return type == that.type && encoding == that.encoding && Objects - .equals(measurementId, that.measurementId) + .equals(fullPath, that.fullPath) && Objects.equals(compressor, that.compressor); } @Override public int hashCode() { - return Objects.hash(type, encoding, measurementId, compressor); + return Objects.hash(type, encoding, fullPath, compressor); } /** * compare by measurementID. */ @Override - public int compareTo(MeasurementSchema o) { + public int compareTo(TimeseriesSchema o) { if (equals(o)) { return 0; } else { - return this.measurementId.compareTo(o.measurementId); + return this.fullPath.compareTo(o.fullPath); } } @Override public String toString() { StringContainer sc = new StringContainer(""); - sc.addTail("[", measurementId, ",", type.toString(), ",", encoding.toString(), ",", + sc.addTail("[", fullPath, ",", type.toString(), ",", encoding.toString(), ",", props.toString(), ",", compressor.toString()); sc.addTail("]");
