This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TagUpsert in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b5ad99aa75bdc4718183ebff3f257e71c42941df Author: JackieTien97 <[email protected]> AuthorDate: Mon Apr 27 10:50:52 2020 +0800 support upsert in alter timeseries tag/attribute syntax --- .../1-DDL Data Definition Language.md | 6 +- .../5-Operation Manual/4-SQL Reference.md | 8 + .../1-DDL Data Definition Language.md | 5 + .../5-Operation Manual/4-SQL Reference.md | 8 + .../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 5 + .../org/apache/iotdb/db/metadata/MManager.java | 166 +++++++++++++++++---- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 89 +++++++++-- .../db/qp/logical/sys/AlterTimeSeriesOperator.java | 32 +++- .../db/qp/physical/sys/AlterTimeSeriesPlan.java | 40 ++++- .../iotdb/db/qp/strategy/LogicalGenerator.java | 56 +++---- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 4 +- .../iotdb/db/integration/IoTDBTagAlterIT.java | 104 ++++++++++++- 12 files changed, 432 insertions(+), 91 deletions(-) diff --git a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md index 03e1efa..61bf416 100644 --- a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md +++ b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md @@ -111,7 +111,11 @@ ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4 ``` ALTER timeseries root.turbine.d1.s1 ADD ATTRIBUTES attr3=v3, attr4=v4 ``` - +* upsert tags and attributes +> add new key-value if the key doesn't exist, otherwise, update the old one with new value. +``` +ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag3=v3, tag4=v4) ATTRIBUTES(attr3=v3, attr4=v4) +``` ## Show Timeseries diff --git a/docs/UserGuide/5-Operation Manual/4-SQL Reference.md b/docs/UserGuide/5-Operation Manual/4-SQL Reference.md index 75c1e61..6562e63 100644 --- a/docs/UserGuide/5-Operation Manual/4-SQL Reference.md +++ b/docs/UserGuide/5-Operation Manual/4-SQL Reference.md @@ -113,12 +113,20 @@ alterClause | DROP ID (COMMA ID)* | ADD TAGS property (COMMA property)* | ADD ATTRIBUTES property (COMMA property)* + | UPSERT tagClause attributeClause + ; +attributeClause + : (ATTRIBUTES LR_BRACKET property (COMMA property)* RR_BRACKET)? + ; +tagClause + : (TAGS LR_BRACKET property (COMMA property)* RR_BRACKET)? ; Eg: ALTER timeseries root.turbine.d1.s1 RENAME tag1 TO newTag1 Eg: ALTER timeseries root.turbine.d1.s1 SET tag1=newV1, attr1=newV1 Eg: ALTER timeseries root.turbine.d1.s1 DROP tag1, tag2 Eg: ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4 Eg: ALTER timeseries root.turbine.d1.s1 ADD ATTRIBUTES attr3=v3, attr4=v4 +EG: ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag2=newV2, tag3=v3) ATTRIBUTES(attr3=v3, attr4=v4) ``` * Show All Timeseries Statement diff --git a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md index b45956f..867f120 100644 --- a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md +++ b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md @@ -109,6 +109,11 @@ ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4 ``` ALTER timeseries root.turbine.d1.s1 ADD ATTRIBUTES attr3=v3, attr4=v4 ``` +* 更新插入标签和属性 +> 如果该标签或属性原来不存在,则插入,否则,用新值更新原来的旧值 +``` +ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag2=newV2, tag3=v3) ATTRIBUTES(attr3=v3, attr4=v4) +``` ## 查看时间序列 diff --git a/docs/zh/UserGuide/5-Operation Manual/4-SQL Reference.md b/docs/zh/UserGuide/5-Operation Manual/4-SQL Reference.md index e58ba19..e9ab162 100644 --- a/docs/zh/UserGuide/5-Operation Manual/4-SQL Reference.md +++ b/docs/zh/UserGuide/5-Operation Manual/4-SQL Reference.md @@ -103,12 +103,20 @@ alterClause | DROP ID (COMMA ID)* | ADD TAGS property (COMMA property)* | ADD ATTRIBUTES property (COMMA property)* + | UPSERT tagClause attributeClause + ; +attributeClause + : (ATTRIBUTES LR_BRACKET property (COMMA property)* RR_BRACKET)? + ; +tagClause + : (TAGS LR_BRACKET property (COMMA property)* RR_BRACKET)? ; Eg: ALTER timeseries root.turbine.d1.s1 RENAME tag1 TO newTag1 Eg: ALTER timeseries root.turbine.d1.s1 SET tag1=newV1, attr1=newV1 Eg: ALTER timeseries root.turbine.d1.s1 DROP tag1, tag2 Eg: ALTER timeseries root.turbine.d1.s1 ADD TAGS tag3=v3, tag4=v4 Eg: ALTER timeseries root.turbine.d1.s1 ADD ATTRIBUTES attr3=v3, attr4=v4 +EG: ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag2=newV2, tag3=v3) ATTRIBUTES(attr3=v3, attr4=v4) ``` * 显示所有时间序列语句 diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 index be8edf1..c181d38 100644 --- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 +++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 @@ -127,6 +127,7 @@ alterClause | DROP ID (COMMA ID)* | ADD TAGS property (COMMA property)* | ADD ATTRIBUTES property (COMMA property)* + | UPSERT tagClause attributeClause ; attributeClauses @@ -608,6 +609,10 @@ ADD : A D D ; +UPSERT + : U P S E R T + ; + VALUES : V A L U E S ; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index a4267dd..fd4325f 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -231,10 +231,16 @@ public class MManager { tagMap = tagLogFile.readTag(config.getTagAttributeTotalSize(), offset); } - CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new Path(args[1]), - TSDataType.deserialize(Short.parseShort(args[2])), - TSEncoding.deserialize(Short.parseShort(args[3])), - CompressionType.deserialize(Short.parseShort(args[4])), props, tagMap, null, alias); + CreateTimeSeriesPlan plan = + new CreateTimeSeriesPlan( + new Path(args[1]), + TSDataType.deserialize(Short.parseShort(args[2])), + TSEncoding.deserialize(Short.parseShort(args[3])), + CompressionType.deserialize(Short.parseShort(args[4])), + props, + tagMap, + null, + alias); createTimeseries(plan, offset); break; @@ -289,8 +295,14 @@ public class MManager { } // create time series in MTree - LeafMNode leafMNode = mtree.createTimeseries(path, plan.getDataType(), plan.getEncoding(), - plan.getCompressor(), plan.getProps(), plan.getAlias()); + LeafMNode leafMNode = + mtree.createTimeseries( + path, + plan.getDataType(), + plan.getEncoding(), + plan.getCompressor(), + plan.getProps(), + plan.getAlias()); try { // check memory IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1); @@ -303,7 +315,8 @@ public class MManager { if (plan.getTags() != null) { // tag key, tag value for (Entry<String, String> entry : plan.getTags().entrySet()) { - tagIndex.computeIfAbsent(entry.getKey(), k -> new HashMap<>()) + tagIndex + .computeIfAbsent(entry.getKey(), k -> new HashMap<>()) .computeIfAbsent(entry.getValue(), v -> new HashSet<>()) .add(leafMNode); } @@ -363,8 +376,8 @@ public class MManager { * * @param prefixPath path to be deleted, could be root or a prefix path or a full path * @return 1. The Set contains StorageGroups that contain no more timeseries after this deletion - * files of such StorageGroups should be deleted to reclaim disk space. - * 2. The String is the deletion failed Timeseries + * files of such StorageGroups should be deleted to reclaim disk space. 2. The String is the + * deletion failed Timeseries */ public Pair<Set<String>, String> deleteTimeseries(String prefixPath) throws MetadataException { lock.writeLock().lock(); @@ -717,10 +730,15 @@ public class MManager { tagLogFile.read(config.getTagAttributeTotalSize(), leaf.getOffset()); pair.left.putAll(pair.right); MeasurementSchema measurementSchema = leaf.getSchema(); - res.add(new ShowTimeSeriesResult(leaf.getFullPath(), leaf.getAlias(), - getStorageGroupName(leaf.getFullPath()), measurementSchema.getType().toString(), - measurementSchema.getEncodingType().toString(), - measurementSchema.getCompressor().toString(), pair.left)); + res.add( + new ShowTimeSeriesResult( + leaf.getFullPath(), + leaf.getAlias(), + getStorageGroupName(leaf.getFullPath()), + measurementSchema.getType().toString(), + measurementSchema.getEncodingType().toString(), + measurementSchema.getCompressor().toString(), + pair.left)); if (limit != 0 || offset != 0) { count++; } @@ -766,15 +784,29 @@ public class MManager { try { if (tagFileOffset < 0) { // no tags/attributes - res.add(new ShowTimeSeriesResult(ansString[0], ansString[1], ansString[2], ansString[3], - ansString[4], ansString[5], Collections.emptyMap())); + res.add( + new ShowTimeSeriesResult( + ansString[0], + ansString[1], + ansString[2], + ansString[3], + ansString[4], + ansString[5], + Collections.emptyMap())); } else { // has tags/attributes Pair<Map<String, String>, Map<String, String>> pair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset); pair.left.putAll(pair.right); - res.add(new ShowTimeSeriesResult(ansString[0], ansString[1], ansString[2], ansString[3], - ansString[4], ansString[5], pair.left)); + res.add( + new ShowTimeSeriesResult( + ansString[0], + ansString[1], + ansString[2], + ansString[3], + ansString[4], + ansString[5], + pair.left)); } } catch (IOException e) { throw new MetadataException( @@ -855,12 +887,12 @@ public class MManager { /** * get device node, if the storage group is not set, create it when autoCreateSchema is true * - * !!!!!!Attention!!!!! - * must call the return node's readUnlock() if you call this method. + * <p>!!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. + * * @param path path */ - public MNode getDeviceNodeWithAutoCreateAndReadLock(String path, boolean autoCreateSchema, - int sgLevel) throws MetadataException { + public MNode getDeviceNodeWithAutoCreateAndReadLock( + String path, boolean autoCreateSchema, int sgLevel) throws MetadataException { lock.readLock().lock(); MNode node = null; boolean shouldSetStorageGroup; @@ -905,13 +937,10 @@ public class MManager { } } - /** - * !!!!!!Attention!!!!! - * must call the return node's readUnlock() if you call this method. - */ + /** !!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. */ public MNode getDeviceNodeWithAutoCreateAndReadLock(String path) throws MetadataException { - return getDeviceNodeWithAutoCreateAndReadLock(path, config.isAutoCreateSchemaEnabled(), - config.getDefaultStorageGroupLevel()); + return getDeviceNodeWithAutoCreateAndReadLock( + path, config.isAutoCreateSchemaEnabled(), config.getDefaultStorageGroupLevel()); } /** Get metadata in string */ @@ -947,6 +976,7 @@ public class MManager { /** * change or set the new offset of a timeseries + * * @param path timeseries * @param offset offset in the tag file */ @@ -960,7 +990,71 @@ public class MManager { } /** + * upsert tags and attributes key-value for the timeseries if the key has existed, just use the + * new value to update it. + * + * @param tagsMap newly added tags map + * @param attributesMap newly added attributes map + * @param fullPath timeseries + */ + public void upsertTagsAndAttributes( + Map<String, String> tagsMap, Map<String, String> attributesMap, String fullPath) + throws MetadataException, IOException { + lock.writeLock().lock(); + try { + MNode mNode = mtree.getNodeByPath(fullPath); + if (!(mNode instanceof LeafMNode)) { + throw new PathNotExistException(fullPath); + } + LeafMNode leafMNode = (LeafMNode) mNode; + // no tag or attribute, we need to add a new record in log + if (leafMNode.getOffset() < 0) { + long offset = tagLogFile.write(tagsMap, attributesMap); + logWriter.changeOffset(fullPath, offset); + leafMNode.setOffset(offset); + return; + } + + Pair<Map<String, String>, Map<String, String>> pair = + tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset()); + + for (Entry<String, String> entry : tagsMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + String beforeValue = pair.left.get(key); + pair.left.put(key, value); + // if the key has existed and the value is not equal to the new one + // we should remove before key-value from inverted index map + if (beforeValue != null && !beforeValue.equals(value)) { + tagIndex.get(key).get(beforeValue).remove(leafMNode); + if (tagIndex.get(key).get(beforeValue).isEmpty()) { + tagIndex.get(key).remove(beforeValue); + } + } + + // if the key doesn't exist or the value is not equal to the new one + // we should add a new key-value to inverted index map + if (beforeValue == null || !beforeValue.equals(value)) { + tagIndex + .computeIfAbsent(key, k -> new HashMap<>()) + .computeIfAbsent(value, v -> new HashSet<>()) + .add(leafMNode); + } + } + pair.left.putAll(tagsMap); + pair.right.putAll(attributesMap); + + // persist the change to disk + tagLogFile.write(pair.left, pair.right, leafMNode.getOffset()); + + } finally { + lock.writeLock().unlock(); + } + } + + /** * add new attributes key-value for the timeseries + * * @param attributesMap newly added attributes map * @param fullPath timeseries */ @@ -1003,6 +1097,7 @@ public class MManager { /** * add new tags key-value for the timeseries + * * @param tagsMap newly added tags map * @param fullPath timeseries */ @@ -1054,6 +1149,7 @@ public class MManager { /** * drop tags or attributes of the timeseries + * * @param keySet tags key or attributes key * @param fullPath timeseries path */ @@ -1106,6 +1202,7 @@ public class MManager { /** * set/change the values of tags or attributes + * * @param alterMap the new tags or attributes key-value * @param fullPath timeseries */ @@ -1167,6 +1264,7 @@ public class MManager { /** * rename the tag or attribute's key of the timeseries + * * @param oldKey old key of tag or attribute * @param newKey new key of tag or attribute * @param fullPath timeseries @@ -1182,8 +1280,7 @@ public class MManager { LeafMNode leafMNode = (LeafMNode) mNode; if (leafMNode.getOffset() < 0) { throw new MetadataException( - String.format( - "TimeSeries [%s] does not have [%s] tag/attribute.", fullPath, oldKey)); + String.format("TimeSeries [%s] does not have [%s] tag/attribute.", fullPath, oldKey)); } // tags, attributes Pair<Map<String, String>, Map<String, String>> pair = @@ -1215,8 +1312,7 @@ public class MManager { tagLogFile.write(pair.left, pair.right, leafMNode.getOffset()); } else { throw new MetadataException( - String.format( - "TimeSeries [%s] does not have tag/attribute [%s].", fullPath, oldKey)); + String.format("TimeSeries [%s] does not have tag/attribute [%s].", fullPath, oldKey)); } } finally { lock.writeLock().unlock(); @@ -1257,8 +1353,12 @@ public class MManager { MNode node = nodeDeque.removeFirst(); if (node instanceof LeafMNode) { MeasurementSchema nodeSchema = ((LeafMNode) node).getSchema(); - timeseriesSchemas.add(new MeasurementSchema(node.getFullPath(), nodeSchema.getType(), - nodeSchema.getEncodingType(), nodeSchema.getCompressor())); + timeseriesSchemas.add( + new MeasurementSchema( + node.getFullPath(), + nodeSchema.getType(), + nodeSchema.getEncodingType(), + nodeSchema.getCompressor())); } else if (!node.getChildren().isEmpty()) { nodeDeque.addAll(node.getChildren().values()); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 5615f38..92f363e 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -18,6 +18,38 @@ */ package org.apache.iotdb.db.qp.executor; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER; +import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.IAuthorizer; import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer; @@ -46,8 +78,33 @@ import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.*; -import org.apache.iotdb.db.qp.physical.sys.*; +import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.GroupByFillPlan; +import org.apache.iotdb.db.qp.physical.crud.GroupByPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; +import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.CountPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan; +import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; +import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan; +import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; +import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.db.query.dataset.ListDataSet; @@ -80,14 +137,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.*; - -import static org.apache.iotdb.db.conf.IoTDBConstant.*; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - public class PlanExecutor implements IPlanExecutor { private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class); @@ -672,15 +721,21 @@ public class PlanExecutor implements IPlanExecutor { registeredSeries.add(series); MeasurementSchema schema = knownSchemas.get(series); if (schema == null) { - throw new MetadataException(String.format("Can not get the schema of measurement [%s]", + throw new MetadataException( + String.format( + "Can not get the schema of measurement [%s]", chunkMetadata.getMeasurementUid())); } if (!node.hasChild(chunkMetadata.getMeasurementUid())) { - mManager.createTimeseries(series.getFullPath(), schema.getType(), - schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap()); + mManager.createTimeseries( + series.getFullPath(), + schema.getType(), + schema.getEncodingType(), + schema.getCompressor(), + Collections.emptyMap()); } else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) { throw new QueryProcessException( - String.format("Current Path is not leaf node. %s", series)); + String.format("Current Path is not leaf node. %s", series)); } } } @@ -1009,6 +1064,12 @@ public class PlanExecutor implements IPlanExecutor { case ADD_ATTRIBUTES: mManager.addAttributes(alterMap, path.getFullPath()); break; + case UPSERT: + mManager.upsertTagsAndAttributes( + alterTimeSeriesPlan.getTagsMap(), + alterTimeSeriesPlan.getAttributesMap(), + path.getFullPath()); + break; } } catch (MetadataException e) { throw new QueryProcessException(e); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/AlterTimeSeriesOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/AlterTimeSeriesOperator.java index 41d15ac..a72c76a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/AlterTimeSeriesOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/AlterTimeSeriesOperator.java @@ -30,8 +30,17 @@ public class AlterTimeSeriesOperator extends RootOperator { private AlterType alterType; + // used when the alterType is RENAME, SET, DROP, ADD_TAGS, ADD_ATTRIBUTES + // when the alterType is RENAME, alterMap has only one entry, key is the beforeName, value is the + // currentName + // when the alterType is DROP, only the keySet of alterMap is useful, it contains all the key + // names needed to be removed private Map<String, String> alterMap; + // used when the alterType is UPSERT + private Map<String, String> tagsMap; + private Map<String, String> attributesMap; + public AlterTimeSeriesOperator(int tokenIntType) { super(tokenIntType); operatorType = OperatorType.ALTER_TIMESERIES; @@ -61,7 +70,28 @@ public class AlterTimeSeriesOperator extends RootOperator { this.alterMap = alterMap; } + public Map<String, String> getTagsMap() { + return tagsMap; + } + + public void setTagsMap(Map<String, String> tagsMap) { + this.tagsMap = tagsMap; + } + + public Map<String, String> getAttributesMap() { + return attributesMap; + } + + public void setAttributesMap(Map<String, String> attributesMap) { + this.attributesMap = attributesMap; + } + public enum AlterType { - RENAME, SET, DROP, ADD_TAGS, ADD_ATTRIBUTES + RENAME, + SET, + DROP, + ADD_TAGS, + ADD_ATTRIBUTES, + UPSERT } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AlterTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AlterTimeSeriesPlan.java index 7967c96..34763e3 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AlterTimeSeriesPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AlterTimeSeriesPlan.java @@ -19,28 +19,44 @@ package org.apache.iotdb.db.qp.physical.sys; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator; +import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator.AlterType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.read.common.Path; -import java.util.Collections; -import java.util.List; -import java.util.Map; - public class AlterTimeSeriesPlan extends PhysicalPlan { - private Path path; + private final Path path; + + private final AlterTimeSeriesOperator.AlterType alterType; - private AlterTimeSeriesOperator.AlterType alterType; + // used when the alterType is RENAME, SET, DROP, ADD_TAGS, ADD_ATTRIBUTES + // when the alterType is RENAME, alterMap has only one entry, key is the beforeName, value is the + // currentName + // when the alterType is DROP, only the keySet of alterMap is useful, it contains all the key + // names needed to be removed + private final Map<String, String> alterMap; - private Map<String, String> alterMap; + // used when the alterType is UPSERT + private final Map<String, String> tagsMap; + private final Map<String, String> attributesMap; - public AlterTimeSeriesPlan(Path path, AlterTimeSeriesOperator.AlterType alterType, Map<String, String> alterMap) { + public AlterTimeSeriesPlan( + Path path, + AlterType alterType, + Map<String, String> alterMap, + Map<String, String> tagsMap, + Map<String, String> attributesMap) { super(false, Operator.OperatorType.ALTER_TIMESERIES); this.path = path; this.alterType = alterType; this.alterMap = alterMap; + this.tagsMap = tagsMap; + this.attributesMap = attributesMap; } public Path getPath() { @@ -55,6 +71,14 @@ public class AlterTimeSeriesPlan extends PhysicalPlan { return alterMap; } + public Map<String, String> getTagsMap() { + return tagsMap; + } + + public Map<String, String> getAttributesMap() { + return attributesMap; + } + @Override public List<Path> getPaths() { return Collections.singletonList(path); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java index a9b069b..561ffe0 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.RootOperator; import org.apache.iotdb.db.qp.logical.crud.*; import org.apache.iotdb.db.qp.logical.sys.*; +import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator.AlterType; import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType; import org.apache.iotdb.db.qp.strategy.SqlBaseParser.*; import org.apache.iotdb.db.query.fill.IFill; @@ -208,26 +209,29 @@ public class LogicalGenerator extends SqlBaseBaseListener { Map<String, String> alterMap = new HashMap<>(); // rename if (ctx.RENAME() != null) { - alterTimeSeriesOperator.setAlterType(AlterTimeSeriesOperator.AlterType.RENAME); + alterTimeSeriesOperator.setAlterType(AlterType.RENAME); alterMap.put(ctx.beforeName.getText(), ctx.currentName.getText()); } else if (ctx.SET() != null) { // set - alterTimeSeriesOperator.setAlterType(AlterTimeSeriesOperator.AlterType.SET); + alterTimeSeriesOperator.setAlterType(AlterType.SET); setMap(ctx, alterMap); } else if (ctx.DROP() != null) { // drop - alterTimeSeriesOperator.setAlterType(AlterTimeSeriesOperator.AlterType.DROP); + alterTimeSeriesOperator.setAlterType(AlterType.DROP); for (TerminalNode dropId : ctx.ID()) { alterMap.put(dropId.getText(), null); } } else if (ctx.TAGS() != null) { // add tag - alterTimeSeriesOperator.setAlterType(AlterTimeSeriesOperator.AlterType.ADD_TAGS); + alterTimeSeriesOperator.setAlterType(AlterType.ADD_TAGS); setMap(ctx, alterMap); - } else { + } else if (ctx.ATTRIBUTES() != null){ // add attribute - alterTimeSeriesOperator.setAlterType(AlterTimeSeriesOperator.AlterType.ADD_ATTRIBUTES); + alterTimeSeriesOperator.setAlterType(AlterType.ADD_ATTRIBUTES); setMap(ctx, alterMap); + } else { + // upsert + alterTimeSeriesOperator.setAlterType(AlterType.UPSERT); } alterTimeSeriesOperator.setAlterMap(alterMap); initializedOperator = alterTimeSeriesOperator; @@ -930,32 +934,32 @@ public class LogicalGenerator extends SqlBaseBaseListener { @Override public void enterAttributeClause(AttributeClauseContext ctx) { super.enterAttributeClause(ctx); - List<PropertyContext> attributesList = ctx.property(); - String value; - Map<String, String> attributes = new HashMap<>(attributesList.size()); - if (ctx.property(0) != null) { - for (PropertyContext property : attributesList) { - if(property.propertyValue().STRING_LITERAL() != null) { - value = removeStringQuote(property.propertyValue().getText()); - } else { - value = property.propertyValue().getText(); - - } - attributes.put(property.ID().getText(), value); - } + Map<String, String> attributes = extractMap(ctx.property(), ctx.property(0)); + if (createTimeSeriesOperator != null) { + createTimeSeriesOperator.setAttributes(attributes); + } else if (alterTimeSeriesOperator != null) { + alterTimeSeriesOperator.setAttributesMap(attributes); } - createTimeSeriesOperator.setAttributes(attributes); } @Override public void enterTagClause(TagClauseContext ctx) { super.enterTagClause(ctx); - List<PropertyContext> tagsList = ctx.property(); + Map<String, String> tags = extractMap(ctx.property(), ctx.property(0)); + if (createTimeSeriesOperator != null) { + createTimeSeriesOperator.setTags(tags); + } else if (alterTimeSeriesOperator != null) { + alterTimeSeriesOperator.setTagsMap(tags); + } + } + + private Map<String, String> extractMap(List<PropertyContext> property2, + PropertyContext property3) { String value; - Map<String, String> tags = new HashMap<>(tagsList.size()); - if (ctx.property(0) != null) { - for (PropertyContext property : tagsList) { - if(property.propertyValue().STRING_LITERAL() != null) { + Map<String, String> tags = new HashMap<>(property2.size()); + if (property3 != null) { + for (PropertyContext property : property2) { + if (property.propertyValue().STRING_LITERAL() != null) { value = removeStringQuote(property.propertyValue().getText()); } else { value = property.propertyValue().getText(); @@ -963,7 +967,7 @@ public class LogicalGenerator extends SqlBaseBaseListener { tags.put(property.ID().getText(), value); } } - createTimeSeriesOperator.setTags(tags); + return tags; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 8f27c54..72e66b4 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -107,7 +107,9 @@ public class PhysicalGenerator { return new AlterTimeSeriesPlan( alterTimeSeriesOperator.getPath(), alterTimeSeriesOperator.getAlterType(), - alterTimeSeriesOperator.getAlterMap()); + alterTimeSeriesOperator.getAlterMap(), + alterTimeSeriesOperator.getTagsMap(), + alterTimeSeriesOperator.getAttributesMap()); case DELETE: DeleteDataOperator delete = (DeleteDataOperator) operator; paths = delete.getSelectedPaths(); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTagAlterIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTagAlterIT.java index 4ec5888..ff77f49 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTagAlterIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTagAlterIT.java @@ -18,18 +18,20 @@ */ package org.apache.iotdb.db.integration; -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.jdbc.Config; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; - -import static org.junit.Assert.*; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; public class IoTDBTagAlterIT { @@ -362,4 +364,92 @@ public class IoTDBTagAlterIT { } } + @Test + public void upsertTest() throws ClassNotFoundException { + String[] ret = {"root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,v1,v2,v1,v2"}; + String[] ret2 = {"root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,v1,v2,v1,newV2,v3"}; + String[] ret3 = {"root.turbine.d1.s1,temperature,root.turbine,FLOAT,RLE,SNAPPY,newA1,v2,v3,newV1,newV2,newV3"}; + + + String sql = "create timeseries root.turbine.d1.s1(temperature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY " + + "tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2)"; + Class.forName(Config.JDBC_DRIVER_NAME); + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + statement.execute(sql); + boolean hasResult = statement.execute("show timeseries"); + assertTrue(hasResult); + ResultSet resultSet = statement.getResultSet(); + int count = 0; + while (resultSet.next()) { + String ans = resultSet.getString("timeseries") + + "," + resultSet.getString("alias") + + "," + resultSet.getString("storage group") + + "," + resultSet.getString("dataType") + + "," + resultSet.getString("encoding") + + "," + resultSet.getString("compression") + + "," + resultSet.getString("attr1") + + "," + resultSet.getString("attr2") + + "," + resultSet.getString("tag1") + + "," + resultSet.getString("tag2"); + assertEquals(ret[count], ans); + count++; + } + assertEquals(ret.length, count); + + statement.execute("ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag3=v3, tag2=newV2)"); + hasResult = statement.execute("show timeseries where tag3=v3"); + assertTrue(hasResult); + resultSet = statement.getResultSet(); + count = 0; + while (resultSet.next()) { + String ans = resultSet.getString("timeseries") + + "," + resultSet.getString("alias") + + "," + resultSet.getString("storage group") + + "," + resultSet.getString("dataType") + + "," + resultSet.getString("encoding") + + "," + resultSet.getString("compression") + + "," + resultSet.getString("attr1") + + "," + resultSet.getString("attr2") + + "," + resultSet.getString("tag1") + + "," + resultSet.getString("tag2") + + "," + resultSet.getString("tag3"); + assertEquals(ret2[count], ans); + count++; + } + assertEquals(ret2.length, count); + + statement.execute("ALTER timeseries root.turbine.d1.s1 UPSERT TAGS(tag1=newV1, tag3=newV3) ATTRIBUTES(attr1=newA1, attr3=v3)"); + hasResult = statement.execute("show timeseries where tag3=newV3"); + assertTrue(hasResult); + resultSet = statement.getResultSet(); + count = 0; + while (resultSet.next()) { + String ans = resultSet.getString("timeseries") + + "," + resultSet.getString("alias") + + "," + resultSet.getString("storage group") + + "," + resultSet.getString("dataType") + + "," + resultSet.getString("encoding") + + "," + resultSet.getString("compression") + + "," + resultSet.getString("attr1") + + "," + resultSet.getString("attr2") + + "," + resultSet.getString("attr3") + + "," + resultSet.getString("tag1") + + "," + resultSet.getString("tag2") + + "," + resultSet.getString("tag3"); + assertEquals(ret3[count], ans); + count++; + } + assertEquals(ret3.length, count); + + statement.execute("show timeseries where tag3=v3"); + resultSet = statement.getResultSet(); + assertFalse(resultSet.next()); + + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } }
