This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch kyy in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6ae135174d958d6c6cf60717af722f3ae6e6e899 Author: Ring-k <[email protected]> AuthorDate: Tue Jul 7 23:24:26 2020 +0800 auto create from InsertTablePlan --- .../cluster/server/member/MetaGroupMember.java | 32 +++++++++++----------- .../iotdb/db/qp/physical/crud/InsertPlan.java | 2 ++ .../db/qp/physical/crud/InsertTabletPlan.java | 22 +++++++++++++++ 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index d212933..f000c9a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -407,7 +407,6 @@ public class MetaGroupMember extends RaftMember { } - /** * Apply the addition of a new node. Register its identifier, add it to the node list and * partition table, serialize the partition table and update the DataGroupMembers. @@ -693,7 +692,6 @@ public class MetaGroupMember extends RaftMember { } - /** * @return Whether all nodes' identifier is known. */ @@ -722,7 +720,7 @@ public class MetaGroupMember extends RaftMember { /** * Process the join cluster request of "node". Only proceed when the partition table is ready. * - * @param node cannot be the local node + * @param node cannot be the local node */ public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws AddSelfException, LogExecutionException { @@ -1432,7 +1430,7 @@ public class MetaGroupMember extends RaftMember { setStorageGroupResult.getCode(), storageGroupName) ); } - if(plan instanceof InsertRowPlan){ + if (plan instanceof InsertRowPlan) { // try to create timeseries boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan); if (!isAutoCreateTimeseriesSuccess) { @@ -1451,9 +1449,9 @@ public class MetaGroupMember extends RaftMember { * @return */ TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { - InsertRowPlan backup = null; - if (plan instanceof InsertRowPlan) { - backup = (InsertRowPlan) ((InsertRowPlan) plan).clone(); + InsertPlan backup = null; + if (plan instanceof InsertPlan) { + backup = (InsertPlan) ((InsertPlan) plan).clone(); } // the error codes from the groups that cannot execute the plan TSStatus status; @@ -1466,7 +1464,7 @@ public class MetaGroupMember extends RaftMember { status = forwardToMultipleGroup(planGroupMap); } } - if (plan instanceof InsertRowPlan + if (plan instanceof InsertPlan && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { // try to create timeseries @@ -1607,7 +1605,7 @@ public class MetaGroupMember extends RaftMember { * @param insertPlan, some of the timeseries in it are not created yet * @return true of all uncreated timeseries are created */ - boolean autoCreateTimeseries(InsertRowPlan insertPlan) { + boolean autoCreateTimeseries(InsertPlan insertPlan) { List<String> seriesList = new ArrayList<>(); String deviceId = insertPlan.getDeviceId(); String storageGroupName; @@ -1629,7 +1627,9 @@ public class MetaGroupMember extends RaftMember { for (String seriesPath : unregisteredSeriesList) { int index = seriesList.indexOf(seriesPath); TSDataType dataType = TypeInferenceUtils - .getPredictedDataType(insertPlan.getValues()[index], true); + .getPredictedDataType(insertPlan instanceof InsertTabletPlan + ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0] + : ((InsertRowPlan) insertPlan).getValues()[index], true); TSEncoding encoding = getDefaultEncoding(dataType); CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath), @@ -1824,7 +1824,8 @@ public class MetaGroupMember extends RaftMember { if (logger.isDebugEnabled()) { logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}", name, - schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, node, + schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, + node, request.getHeader()); } results.addAll(schemas); @@ -2654,16 +2655,16 @@ public class MetaGroupMember extends RaftMember { } - /** * Process the request of removing a node from the cluster. Reject the request if partition table * is unavailable or the node is not the MetaLeader and it does not know who the leader is. * Otherwise (being the MetaLeader), the request will be processed locally and broadcast to every * node. * - * @param node the node to be removed. + * @param node the node to be removed. */ - public long removeNode(Node node) throws PartitionTableUnavailableException, LogExecutionException { + public long removeNode(Node node) + throws PartitionTableUnavailableException, LogExecutionException { if (partitionTable == null) { logger.info("Cannot add node now because the partition table is not set"); throw new PartitionTableUnavailableException(thisNode); @@ -2675,13 +2676,12 @@ public class MetaGroupMember extends RaftMember { } - /** * Process a node removal request locally and broadcast it to the whole cluster. The removal will * be rejected if number of nodes will fall below half of the replication number after this * operation. * - * @param node the node to be removed. + * @param node the node to be removed. * @return Long.MIN_VALUE if further forwarding is required, or the execution result */ private long processRemoveNodeLocally(Node node) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 3b4222c..5c81cdc 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -95,4 +95,6 @@ public abstract class InsertPlan extends PhysicalPlan { dataTypes[index] = null; } + @Override + public abstract Object clone(); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java index f7b59db..dd6b5d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java @@ -482,4 +482,26 @@ public class InsertTabletPlan extends InsertPlan { columns[index] = null; } + @Override + public Object clone() { + String deviceIdClone = deviceId; + String[] measurementsClone = new String[this.measurements.length]; + System.arraycopy(this.measurements, 0, measurementsClone, 0, measurementsClone.length); + TSDataType[] typesClone = new TSDataType[this.dataTypes.length]; + System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length); + InsertTabletPlan ret = new InsertTabletPlan(deviceIdClone, measurementsClone); + + ret.setDataTypes(typesClone); + + long[] timesClone = new long[times.length]; + System.arraycopy(this.times, 0, timesClone, 0, times.length); + ret.setTimes(timesClone); + Object[] columnsClone = new Object[columns.length]; + System.arraycopy(this.columns, 0, columnsClone, 0, columns.length); + ret.setColumns(columnsClone); + + ret.setRowCount(rowCount); + return ret; + } + }
