This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch serialize-physicalplan in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1f626c4b66c7ad1bccbff19a2a574587bf0e0aff Author: lta <[email protected]> AuthorDate: Wed Mar 27 22:34:56 2019 +0800 add metadata plan codec --- .../iotdb/db/qp/logical/sys/AuthorOperator.java | 96 +++++- .../iotdb/db/qp/logical/sys/MetadataOperator.java | 38 ++- .../iotdb/db/qp/physical/sys/AuthorPlan.java | 30 ++ .../iotdb/db/qp/physical/sys/MetadataPlan.java | 26 ++ .../org/apache/iotdb/db/utils/ByteBufferUtils.java | 59 ++++ .../apache/iotdb/db/writelog/transfer/Codec.java | 1 + .../iotdb/db/writelog/transfer/CodecInstances.java | 330 +++++++++++++++++++++ .../db/writelog/transfer/PhysicalPlanCodec.java | 184 +----------- .../writelog/transfer/PhysicalPlanLogTransfer.java | 35 ++- .../db/writelog/transfer/SystemLogOperator.java | 6 +- .../iotdb/db/integration/IoTDBLargeDataIT.java | 2 +- .../iotdb/db/integration/IoTDBMultiSeriesIT.java | 2 +- .../transfer/PhysicalPlanLogTransferTest.java | 115 +++++++ 13 files changed, 723 insertions(+), 201 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/AuthorOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/AuthorOperator.java index b237239..65a17d4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/AuthorOperator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/AuthorOperator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.qp.logical.sys; import org.apache.iotdb.db.qp.logical.RootOperator; +import org.apache.iotdb.db.qp.logical.sys.MetadataOperator.NamespaceType; import org.apache.iotdb.tsfile.read.common.Path; /** @@ -113,6 +114,99 @@ public class AuthorOperator extends RootOperator { public enum AuthorType { CREATE_USER, CREATE_ROLE, DROP_USER, DROP_ROLE, GRANT_ROLE, GRANT_USER, GRANT_ROLE_TO_USER, REVOKE_USER, REVOKE_ROLE, REVOKE_ROLE_FROM_USER, UPDATE_USER, LIST_USER, LIST_ROLE, - LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS + LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS; + + /** + * deserialize short number. + * + * @param i short number + * @return NamespaceType + */ + public static AuthorType deserialize(short i) { + switch (i) { + case 0: + return CREATE_USER; + case 1: + return CREATE_ROLE; + case 2: + return DROP_USER; + case 3: + return DROP_ROLE; + case 4: + return GRANT_ROLE; + case 5: + return GRANT_USER; + case 6: + return GRANT_ROLE_TO_USER; + case 7: + return REVOKE_USER; + case 8: + return REVOKE_ROLE; + case 9: + return REVOKE_ROLE_FROM_USER; + case 10: + return UPDATE_USER; + case 11: + return LIST_USER; + case 12: + return LIST_ROLE; + case 13: + return LIST_USER_PRIVILEGE; + case 14: + return LIST_ROLE_PRIVILEGE; + case 15: + return LIST_USER_ROLES; + case 16: + return LIST_ROLE_USERS; + default: + return null; + } + } + + /** + * serialize. + * + * @return short number + */ + public short serialize() { + switch (this) { + case CREATE_USER: + return 0; + case CREATE_ROLE: + return 1; + case DROP_USER: + return 2; + case DROP_ROLE: + return 3; + case GRANT_ROLE: + return 4; + case GRANT_USER: + return 5; + case GRANT_ROLE_TO_USER: + return 6; + case REVOKE_USER: + return 7; + case REVOKE_ROLE: + return 8; + case REVOKE_ROLE_FROM_USER: + return 9; + case UPDATE_USER: + return 10; + case LIST_USER: + return 11; + case LIST_ROLE: + return 12; + case LIST_USER_PRIVILEGE: + return 13; + case LIST_ROLE_PRIVILEGE: + return 14; + case LIST_USER_ROLES: + return 15; + case LIST_ROLE_USERS: + return 16; + default: + return 0; + } + } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/MetadataOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/MetadataOperator.java index 813b0c5..057e011 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/MetadataOperator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/logical/sys/MetadataOperator.java @@ -111,6 +111,42 @@ public class MetadataOperator extends RootOperator { } public enum NamespaceType { - ADD_PATH, DELETE_PATH, SET_FILE_LEVEL + ADD_PATH, DELETE_PATH, SET_FILE_LEVEL; + + /** + * deserialize short number. + * + * @param i short number + * @return NamespaceType + */ + public static NamespaceType deserialize(short i) { + switch (i) { + case 0: + return ADD_PATH; + case 1: + return DELETE_PATH; + case 2: + return SET_FILE_LEVEL; + default: + return null; + } + } + + /** + * serialize. + * + * @return short number + */ + public short serialize() { + switch (this) { + case ADD_PATH: + return 0; + case DELETE_PATH: + return 1; + case SET_FILE_LEVEL: + default: + return 0; + } + } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java index 4641ad9..b4b108a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.entity.PrivilegeType; @@ -142,6 +143,10 @@ public class AuthorPlan extends PhysicalPlan { return permissions; } + public void setPermissions(Set<Integer> permissions) { + this.permissions = permissions; + } + public Path getNodeName() { return nodeName; } @@ -185,4 +190,29 @@ public class AuthorPlan extends PhysicalPlan { } return ret; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AuthorPlan)) { + return false; + } + AuthorPlan that = (AuthorPlan) o; + return getAuthorType() == that.getAuthorType() && + Objects.equals(getUserName(), that.getUserName()) && + Objects.equals(getRoleName(), that.getRoleName()) && + Objects.equals(getPassword(), that.getPassword()) && + Objects.equals(getNewPassword(), that.getNewPassword()) && + Objects.equals(getPermissions(), that.getPermissions()) && + Objects.equals(getNodeName(), that.getNodeName()); + } + + @Override + public int hashCode() { + return Objects + .hash(getAuthorType(), getUserName(), getRoleName(), getPassword(), getNewPassword(), + getPermissions(), getNodeName()); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java index df0c566..f612165 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.sys; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.sys.MetadataOperator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; @@ -142,4 +143,29 @@ public class MetadataPlan extends PhysicalPlan { } return ret; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MetadataPlan)) { + return false; + } + MetadataPlan that = (MetadataPlan) o; + return getNamespaceType() == that.getNamespaceType() && + Objects.equals(getPath(), that.getPath()) && + getDataType() == that.getDataType() && + getCompressor() == that.getCompressor() && + getEncoding() == that.getEncoding() && + Objects.equals(getProps(), that.getProps()) && + Objects.equals(getDeletePathList(), that.getDeletePathList()); + } + + @Override + public int hashCode() { + return Objects + .hash(getNamespaceType(), getPath(), getDataType(), getCompressor(), getEncoding(), + getProps(), getDeletePathList()); + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/ByteBufferUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/ByteBufferUtils.java new file mode 100644 index 0000000..0035847 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/ByteBufferUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.utils; + +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.utils.BytesUtils; + +public class ByteBufferUtils { + + /** + * Put a string to ByteBuffer, first put a int which represents the bytes len of string, then put + * the bytes of string to ByteBuffer + * + * @param buffer Target ByteBuffer to put a string value + * @param value String value to be put + */ + public static void putString(ByteBuffer buffer, String value) { + if(value == null){ + buffer.putInt(-1); + }else{ + byte[] vBytes = BytesUtils.stringToBytes(value); + buffer.putInt(vBytes.length); + buffer.put(vBytes); + } + } + + /** + * Read a string value from ByteBuffer, first read a int that represents the bytes len of string, + * then read bytes len value, finally transfer bytes to string + * + * @param buffer ByteBuffer to be read + * @return string value + */ + public static String readString(ByteBuffer buffer) { + int valueLen = buffer.getInt(); + if(valueLen == -1) + return null; + byte[] valueBytes = new byte[valueLen]; + buffer.get(valueBytes); + return BytesUtils.bytesToString(valueBytes); + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java index 90a2bc6..76c0598 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/Codec.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.writelog.transfer; import java.io.IOException; +import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; interface Codec<T extends PhysicalPlan> { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java new file mode 100644 index 0000000..6c37c17 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/CodecInstances.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.writelog.transfer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.iotdb.db.auth.AuthException; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.qp.logical.sys.AuthorOperator; +import org.apache.iotdb.db.qp.logical.sys.MetadataOperator; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; +import org.apache.iotdb.db.utils.ByteBufferUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.Pair; + +public class CodecInstances { + + private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + private CodecInstances() { + } + + static final Codec<DeletePlan> deletePlanCodec = new Codec<DeletePlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(DeletePlan t) { + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + + int type = SystemLogOperator.DELETE; + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + buffer.putLong(t.getDeleteTime()); + ByteBufferUtils.putString(buffer, t.getPaths().get(0).getFullPath()); + + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public DeletePlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.get(); // read and skip an int representing "type". + long time = buffer.getLong(); + + String path = ByteBufferUtils.readString(buffer); + + return new DeletePlan(time, new Path(path)); + } + }; + + static final Codec<UpdatePlan> updatePlanCodec = new Codec<UpdatePlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(UpdatePlan updatePlan) { + int type = SystemLogOperator.UPDATE; + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + buffer.putInt(updatePlan.getIntervals().size()); + for (Pair<Long, Long> pair : updatePlan.getIntervals()) { + buffer.putLong(pair.left); + buffer.putLong(pair.right); + } + + ByteBufferUtils.putString(buffer, updatePlan.getValue()); + ByteBufferUtils.putString(buffer, updatePlan.getPath().getFullPath()); + + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public UpdatePlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.get(); // read and skip an int representing "type" + + int timeListBytesLength = buffer.getInt(); + List<Pair<Long, Long>> timeArrayList = new ArrayList<>(timeListBytesLength); + for (int i = 0; i < timeListBytesLength; i++) { + long startTime = buffer.getLong(); + long endTime = buffer.getLong(); + timeArrayList.add(new Pair<>(startTime, endTime)); + } + + String value = ByteBufferUtils.readString(buffer); + String path = ByteBufferUtils.readString(buffer); + + return new UpdatePlan(timeArrayList, value, new Path(path)); + } + }; + + static final Codec<InsertPlan> multiInsertPlanCodec = new Codec<InsertPlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(InsertPlan plan) { + int type = SystemLogOperator.INSERT; + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + buffer.put((byte) plan.getInsertType()); + buffer.putLong(plan.getTime()); + + ByteBufferUtils.putString(buffer, plan.getDeviceId()); + + List<String> measurementList = plan.getMeasurements(); + buffer.putInt(measurementList.size()); + for (String m : measurementList) { + ByteBufferUtils.putString(buffer, m); + } + + List<String> valueList = plan.getValues(); + buffer.putInt(valueList.size()); + for (String m : valueList) { + ByteBufferUtils.putString(buffer, m); + } + + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public InsertPlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + buffer.get(); // read and skip an int representing "type" + int insertType = buffer.get(); + long time = buffer.getLong(); + + String device = ByteBufferUtils.readString(buffer); + + int mmListLength = buffer.getInt(); + List<String> measurementsList = new ArrayList<>(mmListLength); + for (int i = 0; i < mmListLength; i++) { + measurementsList.add(ByteBufferUtils.readString(buffer)); + } + + int valueListLength = buffer.getInt(); + List<String> valuesList = new ArrayList<>(valueListLength); + for (int i = 0; i < valueListLength; i++) { + valuesList.add(ByteBufferUtils.readString(buffer)); + } + + InsertPlan ans = new InsertPlan(device, time, measurementsList, valuesList); + ans.setInsertType(insertType); + return ans; + } + }; + + static final Codec<MetadataPlan> metadataPlanCodec = new Codec<MetadataPlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(MetadataPlan plan) { + int type = SystemLogOperator.METADATA; + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + buffer.put((byte) plan.getNamespaceType().serialize()); + buffer.put((byte) plan.getDataType().serialize()); + buffer.put((byte) plan.getCompressor().serialize()); + buffer.put((byte) plan.getEncoding().serialize()); + + String path = plan.getPath().toString(); + ByteBufferUtils.putString(buffer, path); + + List<Path> deletePathList = plan.getDeletePathList(); + if (deletePathList == null) { + buffer.putInt(-1); + } else { + buffer.putInt(deletePathList.size()); + for (Path deletePath : deletePathList) { + ByteBufferUtils.putString(buffer, deletePath.toString()); + } + } + + Map<String, String> props = plan.getProps(); + buffer.putInt(props.size()); + for (Entry<String, String> entry : props.entrySet()) { + ByteBufferUtils.putString(buffer, entry.getKey()); + ByteBufferUtils.putString(buffer, entry.getValue()); + } + + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public MetadataPlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + buffer.get(); // read and skip an int representing "type" + + MetadataOperator.NamespaceType namespaceType = MetadataOperator.NamespaceType + .deserialize(buffer.get()); + TSDataType dataType = TSDataType.deserialize(buffer.get()); + CompressionType compressor = CompressionType.deserialize(buffer.get()); + TSEncoding encoding = TSEncoding.deserialize(buffer.get()); + + String path = ByteBufferUtils.readString(buffer); + int pathListLen = buffer.getInt(); + List<Path> deletePathList = null; + if (pathListLen != -1) { + deletePathList = new ArrayList<>(pathListLen); + for (int i = 0; i < pathListLen; i++) { + deletePathList.add(new Path(ByteBufferUtils.readString(buffer))); + } + } + + int propsLen = buffer.getInt(); + Map<String, String> props = new HashMap<>(propsLen); + for (int i = 0; i < propsLen; i++) { + props.put(ByteBufferUtils.readString(buffer), ByteBufferUtils.readString(buffer)); + } + + return new MetadataPlan(namespaceType, new Path(path), dataType, compressor, encoding, props, + deletePathList); + } + }; + + static final Codec<AuthorPlan> authorPlanCodec = new Codec<AuthorPlan>() { + ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); + + @Override + public byte[] encode(AuthorPlan plan) { + int type = SystemLogOperator.AUTHOR; + if (localBuffer.get() == null) { + localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); + } + ByteBuffer buffer = localBuffer.get(); + buffer.clear(); + buffer.put((byte) type); + + int authorType = plan.getAuthorType().serialize(); + buffer.put((byte)authorType); + + ByteBufferUtils.putString(buffer, plan.getUserName()); + ByteBufferUtils.putString(buffer, plan.getRoleName()); + ByteBufferUtils.putString(buffer, plan.getPassword()); + ByteBufferUtils.putString(buffer, plan.getNewPassword()); + ByteBufferUtils.putString(buffer, plan.getNodeName().toString()); + + Set<Integer> permissions = plan.getPermissions(); + if (permissions == null) { + buffer.putInt(-1); + } else { + buffer.putInt(permissions.size()); + for (int permission : permissions) { + buffer.putInt(permission); + } + } + return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); + } + + @Override + public AuthorPlan decode(byte[] bytes) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + + buffer.get(); // read and skip an int representing "type" + + AuthorOperator.AuthorType authorType = AuthorOperator.AuthorType.deserialize(buffer.get()); + String userName = ByteBufferUtils.readString(buffer); + String roleName = ByteBufferUtils.readString(buffer); + String password = ByteBufferUtils.readString(buffer); + String newPassword = ByteBufferUtils.readString(buffer); + Path nodeName = new Path(ByteBufferUtils.readString(buffer)); + Set<Integer> permissions = null; + int permissionListLen = buffer.getInt(); + if (permissionListLen != -1) { + permissions = new HashSet<>(permissionListLen); + for(int i =0 ; i < permissionListLen; i ++) { + permissions.add(buffer.getInt()); + } + } + AuthorPlan authorPlan = null; + try { + authorPlan = new AuthorPlan(authorType, userName, roleName, password, newPassword, + null, nodeName); + } catch (AuthException e) { + /** This AuthException will never be caught if authorization parameter is null **/ + } + authorPlan.setPermissions(permissions); + return authorPlan; + } + }; + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java index ae98f38..3ce856c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java @@ -35,13 +35,13 @@ import org.apache.iotdb.tsfile.utils.Pair; public enum PhysicalPlanCodec { - MULTIINSERTPLAN(SystemLogOperator.INSERT, CodecInstances.multiInsertPlanCodec), UPDATEPLAN( - SystemLogOperator.UPDATE, - CodecInstances.updatePlanCodec), DELETEPLAN(SystemLogOperator.DELETE, - CodecInstances.deletePlanCodec); + MULTIINSERTPLAN(SystemLogOperator.INSERT, CodecInstances.multiInsertPlanCodec), + UPDATEPLAN(SystemLogOperator.UPDATE, CodecInstances.updatePlanCodec), + DELETEPLAN(SystemLogOperator.DELETE, CodecInstances.deletePlanCodec), + METADATAPLAN(SystemLogOperator.METADATA, CodecInstances.metadataPlanCodec), + AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec); private static final HashMap<Integer, PhysicalPlanCodec> codecMap = new HashMap<>(); - private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); static { for (PhysicalPlanCodec codec : PhysicalPlanCodec.values()) { @@ -64,178 +64,4 @@ public enum PhysicalPlanCodec { } return codecMap.get(opcode); } - - static class CodecInstances { - - private CodecInstances(){} - - static final Codec<DeletePlan> deletePlanCodec = new Codec<DeletePlan>() { - ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); - - @Override - public byte[] encode(DeletePlan t) { - if (localBuffer.get() == null) { - localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); - } - - int type = SystemLogOperator.DELETE; - ByteBuffer buffer = localBuffer.get(); - buffer.clear(); - buffer.put((byte) type); - buffer.putLong(t.getDeleteTime()); - byte[] pathBytes = BytesUtils.stringToBytes(t.getPaths().get(0).getFullPath()); - buffer.putInt(pathBytes.length); - buffer.put(pathBytes); - - return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); - } - - @Override - public DeletePlan decode(byte[] bytes) throws IOException { - ByteBuffer buffer = ByteBuffer.wrap(bytes); - buffer.get(); // read and skip an int representing "type". - long time = buffer.getLong(); - - int pathLength = buffer.getInt(); - byte[] pathBytes = new byte[pathLength]; - buffer.get(pathBytes, 0, pathLength); - String path = BytesUtils.bytesToString(pathBytes); - - return new DeletePlan(time, new Path(path)); - } - }; - - static final Codec<UpdatePlan> updatePlanCodec = new Codec<UpdatePlan>() { - ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); - - @Override - public byte[] encode(UpdatePlan updatePlan) { - int type = SystemLogOperator.UPDATE; - if (localBuffer.get() == null) { - localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); - } - - ByteBuffer buffer = localBuffer.get(); - buffer.clear(); - buffer.put((byte) type); - buffer.putInt(updatePlan.getIntervals().size()); - for (Pair<Long, Long> pair : updatePlan.getIntervals()) { - buffer.putLong(pair.left); - buffer.putLong(pair.right); - } - - byte[] valueBytes = BytesUtils.stringToBytes(updatePlan.getValue()); - buffer.putInt(valueBytes.length); - buffer.put(valueBytes); - - byte[] pathBytes = BytesUtils.stringToBytes(updatePlan.getPath().getFullPath()); - buffer.putInt(pathBytes.length); - buffer.put(pathBytes); - - return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); - } - - @Override - public UpdatePlan decode(byte[] bytes) throws IOException { - ByteBuffer buffer = ByteBuffer.wrap(bytes); - buffer.get(); // read and skip an int representing "type" - - int timeListBytesLength = buffer.getInt(); - List<Pair<Long, Long>> timeArrayList = new ArrayList<>(timeListBytesLength); - for (int i = 0; i < timeListBytesLength; i++) { - long startTime = buffer.getLong(); - long endTime = buffer.getLong(); - timeArrayList.add(new Pair<>(startTime, endTime)); - } - - int valueLength = buffer.getInt(); - byte[] valueBytes = new byte[valueLength]; - buffer.get(valueBytes); - String value = BytesUtils.bytesToString(valueBytes); - - int pathLength = buffer.getInt(); - byte[] pathBytes = new byte[pathLength]; - buffer.get(pathBytes); - String path = BytesUtils.bytesToString(pathBytes); - - return new UpdatePlan(timeArrayList, value, new Path(path)); - } - }; - - static final Codec<InsertPlan> multiInsertPlanCodec = new Codec<InsertPlan>() { - ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>(); - - @Override - public byte[] encode(InsertPlan plan) { - int type = SystemLogOperator.INSERT; - if (localBuffer.get() == null) { - localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize())); - } - ByteBuffer buffer = localBuffer.get(); - buffer.clear(); - buffer.put((byte) type); - buffer.put((byte) plan.getInsertType()); - buffer.putLong(plan.getTime()); - - byte[] deviceBytes = BytesUtils.stringToBytes(plan.getDeviceId()); - buffer.putInt(deviceBytes.length); - buffer.put(deviceBytes); - - List<String> measurementList = plan.getMeasurements(); - buffer.putInt(measurementList.size()); - for (String m : measurementList) { - byte[] mBytes = BytesUtils.stringToBytes(m); - buffer.putInt(mBytes.length); - buffer.put(mBytes); - } - - List<String> valueList = plan.getValues(); - buffer.putInt(valueList.size()); - for (String m : valueList) { - byte[] vBytes = BytesUtils.stringToBytes(m); - buffer.putInt(vBytes.length); - buffer.put(vBytes); - } - - return Arrays.copyOfRange(buffer.array(), 0, buffer.position()); - } - - @Override - public InsertPlan decode(byte[] bytes) throws IOException { - ByteBuffer buffer = ByteBuffer.wrap(bytes); - - buffer.get(); // read and skip an int representing "type" - int insertType = buffer.get(); - long time = buffer.getLong(); - - int deltaObjLen = buffer.getInt(); - byte[] deltaObjBytes = new byte[deltaObjLen]; - buffer.get(deltaObjBytes); - String device = BytesUtils.bytesToString(deltaObjBytes); - - int mmListLength = buffer.getInt(); - List<String> measurementsList = new ArrayList<>(mmListLength); - for (int i = 0; i < mmListLength; i++) { - int mmLen = buffer.getInt(); - byte[] mmBytes = new byte[mmLen]; - buffer.get(mmBytes); - measurementsList.add(BytesUtils.bytesToString(mmBytes)); - } - - int valueListLength = buffer.getInt(); - List<String> valuesList = new ArrayList<>(valueListLength); - for (int i = 0; i < valueListLength; i++) { - int valueLen = buffer.getInt(); - byte[] valueBytes = new byte[valueLen]; - buffer.get(valueBytes); - valuesList.add(BytesUtils.bytesToString(valueBytes)); - } - - InsertPlan ans = new InsertPlan(device, time, measurementsList, valuesList); - ans.setInsertType(insertType); - return ans; - } - }; - - } } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java index 76a6e46..22ef179 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java @@ -20,28 +20,35 @@ package org.apache.iotdb.db.writelog.transfer; import java.io.IOException; import java.nio.BufferOverflowException; +import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.exception.WALOverSizedException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; public class PhysicalPlanLogTransfer { - private PhysicalPlanLogTransfer(){} + private PhysicalPlanLogTransfer() { + } public static byte[] operatorToLog(PhysicalPlan plan) throws IOException { Codec<PhysicalPlan> codec; - switch (plan.getOperatorType()) { - case INSERT: - codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.INSERT).codec; - break; - case UPDATE: - codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.UPDATE).codec; - break; - case DELETE: - codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.DELETE).codec; - break; - default: - throw new UnsupportedOperationException( - "SystemLogOperator given is not supported. " + plan.getOperatorType()); + if (plan instanceof InsertPlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.INSERT).codec; + } else if (plan instanceof UpdatePlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.UPDATE).codec; + } else if (plan instanceof DeletePlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.DELETE).codec; + } else if (plan instanceof MetadataPlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.METADATA).codec; + } else if (plan instanceof AuthorPlan) { + codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.AUTHOR).codec; + } else { + throw new UnsupportedOperationException( + "SystemLogOperator given is not supported. " + plan.getOperatorType()); } try { return codec.encode(plan); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java index b8ed50f..ebc46e3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/SystemLogOperator.java @@ -28,8 +28,6 @@ public class SystemLogOperator { public static final int INSERT = 0; public static final int UPDATE = 1; public static final int DELETE = 2; - public static final int OVERFLOWFLUSHSTART = 3; - public static final int OVERFLOWFLUSHEND = 4; - public static final int BUFFERFLUSHSTART = 5; - public static final int BUFFERFLUSHEND = 6; + public static final int METADATA = 3; + public static final int AUTHOR = 4; } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java index df2902e..fc2bba4 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java @@ -362,7 +362,7 @@ public class IoTDBLargeDataIT { if (time > 200900) { assertEquals("7777", value); } - // String ans = resultSet.getString(d0s1); + // String ans = resultSet.readString(d0s1); cnt++; } assertEquals(22800, cnt); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java index f97e6c1..0c81171 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java @@ -359,7 +359,7 @@ public class IoTDBMultiSeriesIT { if (time > 200900) { assertEquals("7777", value); } - // String ans = resultSet.getString(d0s1); + // String ans = resultSet.readString(d0s1); cnt++; } assertEquals(22800, cnt); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java new file mode 100644 index 0000000..d91496c --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransferTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.writelog.transfer; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.iotdb.db.auth.AuthException; +import org.apache.iotdb.db.exception.ArgsErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.exception.qp.QueryProcessorException; +import org.apache.iotdb.db.qp.QueryProcessor; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.MetadataPlan; +import org.apache.iotdb.db.qp.utils.MemIntQpExecutor; +import org.apache.iotdb.tsfile.read.common.Path; +import org.junit.Test; + +public class PhysicalPlanLogTransferTest { + + private QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor()); + private InsertPlan insertPlan = new InsertPlan(1, "device", 100, + Arrays.asList("s1", "s2", "s3", "s4"), Arrays.asList("0.1", "100", "test", "false")); + private DeletePlan deletePlan = new DeletePlan(50, new Path("root.vehicle.device")); + private UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", + new Path("root.vehicle.device.sensor")); + + @Test + public void operatorToLog() + throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException { + byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan); + Codec<InsertPlan> insertPlanCodec = CodecInstances.multiInsertPlanCodec; + byte[] insertPlanProperty = insertPlanCodec.encode(insertPlan); + assertEquals(true, Arrays.equals(insertPlanProperty, insertPlanBytesTest)); + + byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan); + Codec<DeletePlan> deletePlanCodec = CodecInstances.deletePlanCodec; + byte[] deletePlanProperty = deletePlanCodec.encode(deletePlan); + assertEquals(true, Arrays.equals(deletePlanProperty, deletePlanBytesTest)); + + byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan); + Codec<UpdatePlan> updatePlanCodec = CodecInstances.updatePlanCodec; + byte[] updatePlanProperty = updatePlanCodec.encode(updatePlan); + assertEquals(true, Arrays.equals(updatePlanProperty, updatePlanBytesTest)); + + String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE"; + MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement); + byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan); + Codec<MetadataPlan> metadataPlanCodec = CodecInstances.metadataPlanCodec; + byte[] metadataPlanProperty = metadataPlanCodec.encode(metadataPlan); + assertEquals(true, Arrays.equals(metadataPlanProperty, metadataPlanBytesTest)); + + String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor"; + AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql); + byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan); + Codec<AuthorPlan> authorPlanCodec = CodecInstances.authorPlanCodec; + byte[] authorPlanProperty = authorPlanCodec.encode(authorPlan); + assertEquals(true, Arrays.equals(authorPlanProperty, authorPlanBytesTest)); + + } + + @Test + public void logToOperator() + throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException, AuthException { + + byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan); + InsertPlan insertPlanTest = (InsertPlan) PhysicalPlanLogTransfer + .logToOperator(insertPlanBytesTest); + assertEquals(true, insertPlanTest.equals(insertPlan)); + + byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan); + DeletePlan deletePlanTest = (DeletePlan) PhysicalPlanLogTransfer + .logToOperator(deletePlanBytesTest); + assertEquals(true, deletePlanTest.equals(deletePlan)); + + byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan); + UpdatePlan updatePlanTest = (UpdatePlan) PhysicalPlanLogTransfer + .logToOperator(updatePlanBytesTest); + assertEquals(true, updatePlanTest.equals(updatePlan)); + + String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE"; + MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement); + byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan); + MetadataPlan metadataPlanTest = (MetadataPlan) PhysicalPlanLogTransfer + .logToOperator(metadataPlanBytesTest); + assertEquals(true, metadataPlanTest.equals(metadataPlan)); + + String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor"; + AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql); + byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan); + AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer.logToOperator(authorPlanBytesTest); + assertEquals(true, authorPlanTest.equals(authorPlan)); + + } +} \ No newline at end of file
