This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch continue_write in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ba15f610f28624ea425d72b98ef1b1b2d5ae4491 Author: qiaojialin <[email protected]> AuthorDate: Mon Jun 1 22:04:30 2020 +0800 continue write inside InsertPlan --- .../iotdb/db/engine/memtable/AbstractMemTable.java | 4 ++ .../engine/storagegroup/StorageGroupProcessor.java | 5 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 36 ++++++++----- .../iotdb/db/qp/physical/crud/InsertPlan.java | 59 +++++++++++++++++++--- .../iotdb/db/integration/IoTDBSimpleQueryIT.java | 31 ++++++++++++ 5 files changed, 113 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 5661dda..f5bfe5e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -100,6 +100,10 @@ public abstract class AbstractMemTable implements IMemTable { public void insert(InsertPlan insertPlan) { for (int i = 0; i < insertPlan.getValues().length; i++) { + if (insertPlan.getSchemas()[i] == null) { + continue; + } + Object value = insertPlan.getValues()[i]; memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 80a915a..2f8e977 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -817,13 +817,16 @@ public class StorageGroupProcessor { node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId()); String[] measurementList = plan.getMeasurements(); for (int i = 0; i < measurementList.length; i++) { + if (plan.getSchemas()[i] == null) { + continue; + } // Update cached last value with high priority MNode measurementNode = node.getChild(measurementList[i]); ((LeafMNode) measurementNode) .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime); } - } catch (MetadataException | QueryProcessException e) { + } catch (MetadataException e) { throw new WriteProcessException(e); } finally { if (node != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index a3fb017..e393040 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -873,28 +873,36 @@ public class PlanExecutor implements IPlanExecutor { for (int i = 0; i < measurementList.length; i++) { String measurement = measurementList[i]; - if (!node.hasChild(measurement)) { - if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { - throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement); + try { + if (!node.hasChild(measurement)) { + if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement); + } + TSDataType dataType = TypeInferenceUtils + .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType()); + Path path = new Path(deviceId, measurement); + internalCreateTimeseries(path.toString(), dataType); } - TSDataType dataType = TypeInferenceUtils - .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType()); - Path path = new Path(deviceId, measurement); - internalCreateTimeseries(path.toString(), dataType); - } - LeafMNode measurementNode = (LeafMNode) node.getChild(measurement); - schemas[i] = measurementNode.getSchema(); - // reset measurement to common name instead of alias - measurementList[i] = measurementNode.getName(); + LeafMNode measurementNode = (LeafMNode) node.getChild(measurement); + schemas[i] = measurementNode.getSchema(); + // reset measurement to common name instead of alias + measurementList[i] = measurementNode.getName(); - if(!insertPlan.isInferType()) { - checkType(insertPlan, i, measurementNode.getSchema().getType()); + if (!insertPlan.isInferType()) { + checkType(insertPlan, i, measurementNode.getSchema().getType()); + } + } catch (MetadataException e) { + logger.warn("meet error when check {}.{}", deviceId, measurement, e); + insertPlan.markMeasurementInsertionFailed(i); } } insertPlan.setMeasurements(measurementList); insertPlan.setSchemasAndTransferType(schemas); StorageEngine.getInstance().insert(insertPlan); + if (insertPlan.getFailedMeasurements() != null) { + throw new StorageEngineException("failed to insert points " + insertPlan.getFailedMeasurements()); + } } catch (StorageEngineException | MetadataException e) { throw new QueryProcessException(e); } finally { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 116ca7a..3f7241d 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; @@ -40,9 +42,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InsertPlan extends PhysicalPlan { + private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class); + private long time; private String deviceId; private String[] measurements; @@ -54,6 +60,8 @@ public class InsertPlan extends PhysicalPlan { // if inferType is true, values is String[], and infer types from them private boolean inferType = false; + // record the failed measurements + private List<String> failedMeasurements; public InsertPlan() { super(false, OperatorType.INSERT); @@ -168,12 +176,35 @@ public class InsertPlan extends PhysicalPlan { this.schemas = schemas; if (inferType) { for (int i = 0; i < schemas.length; i++) { + if (schemas[i] == null) { + continue; + } types[i] = schemas[i].getType(); - values[i] = CommonUtils.parseValue(types[i], values[i].toString()); + try { + values[i] = CommonUtils.parseValue(types[i], values[i].toString()); + } catch (Exception e) { + logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId, + measurements[i], values[i], types[i]); + markMeasurementInsertionFailed(i); + } } } } + /** + * @param index failed measurement index + */ + public void markMeasurementInsertionFailed(int index) { + if (failedMeasurements == null) { + failedMeasurements = new ArrayList<>(); + } + failedMeasurements.add(measurements[index]); + schemas[index] = null; + measurements[index] = null; + types[index] = null; + values[index] = null; + } + @Override public List<Path> getPaths() { List<Path> ret = new ArrayList<>(); @@ -235,10 +266,12 @@ public class InsertPlan extends PhysicalPlan { putString(stream, deviceId); - stream.writeInt(measurements.length); + stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); for (String m : measurements) { - putString(stream, m); + if (m != null) { + putString(stream, m); + } } for (MeasurementSchema schema : schemas) { @@ -254,6 +287,9 @@ public class InsertPlan extends PhysicalPlan { private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException { for (int i = 0; i < values.length; i++) { + if (types[i] == null) { + continue; + } ReadWriteIOUtils.write(types[i], outputStream); switch (types[i]) { case BOOLEAN: @@ -282,6 +318,9 @@ public class InsertPlan extends PhysicalPlan { private void putValues(ByteBuffer buffer) throws QueryProcessException { for (int i = 0; i < values.length; i++) { + if (types[i] == null) { + continue; + } ReadWriteIOUtils.write(types[i], buffer); switch (types[i]) { case BOOLEAN: @@ -308,6 +347,10 @@ public class InsertPlan extends PhysicalPlan { } } + public List<String> getFailedMeasurements() { + return failedMeasurements; + } + public TSDataType[] getTypes() { return types; } @@ -352,10 +395,12 @@ public class InsertPlan extends PhysicalPlan { putString(buffer, deviceId); - buffer.putInt(measurements.length); + buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size())); - for (String m : measurements) { - putString(buffer, m); + for (String measurement : measurements) { + if (measurement != null) { + putString(buffer, measurement); + } } try { @@ -391,7 +436,7 @@ public class InsertPlan extends PhysicalPlan { return "deviceId: " + deviceId + ", time: " + time; } - public TimeValuePair composeTimeValuePair(int measurementIndex) throws QueryProcessException { + public TimeValuePair composeTimeValuePair(int measurementIndex) { if (measurementIndex >= values.length) { return null; } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java index 595c23f..cf89bc4 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.integration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; @@ -27,6 +29,7 @@ import java.sql.SQLException; import java.sql.Statement; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.jdbc.IoTDBSQLException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -170,6 +173,34 @@ public class IoTDBSimpleQueryIT { @Test + public void testPartialInsertion() throws SQLException, ClassNotFoundException { + Class.forName(Config.JDBC_DRIVER_NAME); + try(Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", + "root", "root"); + Statement statement = connection.createStatement()){ + statement.execute("SET STORAGE GROUP TO root.sg1"); + statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN"); + statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT32,ENCODING=PLAIN"); + + // seq chunk : [1,10] + try { + statement.execute("INSERT INTO root.sg1.d0(timestamp, s0, s1) VALUES (1, 1, 2.2)"); + fail(); + } catch (IoTDBSQLException e) { + assertTrue(e.getMessage().contains("s1")); + } + + ResultSet resultSet = statement.executeQuery("select s0, s1 from root.sg1.d0"); + + while(resultSet.next()) { + assertEquals(resultSet.getInt("root.sg1.d0.s0"), 1); + assertEquals(resultSet.getString("root.sg1.d0.s1"), null); + } + } + } + + @Test public void testOverlappedPagesMerge() throws SQLException, ClassNotFoundException { Class.forName(Config.JDBC_DRIVER_NAME); try(Connection connection = DriverManager
