This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch auto_create_device_mnode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2169d1b3ccb69e2caa5d7f9185dd10ff55c6e65d Author: samperson1997 <[email protected]> AuthorDate: Sun Mar 21 23:32:06 2021 +0800 [IOTDB-1249] Serialize of device MNode with device template --- .../main/java/org/apache/iotdb/SessionExample.java | 40 ---------- .../org/apache/iotdb/VectorSessionExample.java | 11 +-- .../org/apache/iotdb/db/metadata/MManager.java | 24 +++++- .../iotdb/db/metadata/MetadataOperationType.java | 1 + .../iotdb/db/metadata/logfile/MLogWriter.java | 6 ++ .../apache/iotdb/db/qp/executor/PlanExecutor.java | 10 ++- .../org/apache/iotdb/db/qp/logical/Operator.java | 1 + .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 6 ++ .../qp/physical/sys/AutoCreateDeviceMNodePlan.java | 87 ++++++++++++++++++++++ .../iotdb/db/metadata/MManagerImproveTest.java | 17 +++-- .../iotdb/db/qp/physical/InsertTabletPlanTest.java | 61 ++++++++++++--- 11 files changed, 192 insertions(+), 72 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 93c2176..470256f 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -30,7 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; import java.util.ArrayList; import java.util.HashMap; @@ -424,45 +423,6 @@ public class SessionExample { } } - private static void insertTabletWithAlignedTimeseries() - throws IoTDBConnectionException, StatementExecutionException { - // The schema of measurements of one device - // only measurementId and data type in MeasurementSchema take effects in Tablet - List<IMeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add( - new VectorMeasurementSchema( - new String[] {"s1", "s2"}, new TSDataType[] {TSDataType.INT64, TSDataType.INT32})); - - Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); - - // Method 2 to add tablet data - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - - for (long time = 0; time < 100; time++) { - int row = tablet.rowSize++; - timestamps[row] = time; - - long[] sensor = (long[]) values[0]; - sensor[row] = new Random().nextLong(); - - int[] sensors = (int[]) values[1]; - sensors[row] = new Random().nextInt(); - - if (tablet.rowSize == tablet.getMaxRowNumber()) { - session.insertTablet(tablet, true); - tablet.reset(); - } - } - - if (tablet.rowSize != 0) { - session.insertTablet(tablet, true); - tablet.reset(); - } - - session.executeNonQueryStatement("flush"); - } - private static void deleteData() throws IoTDBConnectionException, StatementExecutionException { String path = ROOT_SG1_D1_S1; long deleteTime = 99; diff --git a/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java b/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java index 62fcc31..35263ec 100644 --- a/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/VectorSessionExample.java @@ -98,19 +98,11 @@ public class VectorSessionExample { session.createDeviceTemplate( "template1", measurementList, dataTypeList, encodingList, compressionTypes); - session.setDeviceTemplate("template1", ROOT_SG1_D1); + session.setDeviceTemplate("template1", "root.sg1"); } private static void insertTabletWithAlignedTimeseries() throws IoTDBConnectionException, StatementExecutionException { - /* - * A Tablet example: - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - */ // The schema of measurements of one device // only measurementId and data type in MeasurementSchema take effects in Tablet List<IMeasurementSchema> schemaList = new ArrayList<>(); @@ -120,7 +112,6 @@ public class VectorSessionExample { Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); - // Method 2 to add tablet data long[] timestamps = tablet.timestamps; Object[] values = tablet.values; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 2556063..56f0151 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -48,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan; import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; @@ -390,6 +391,10 @@ public class MManager { SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan; setDeviceTemplate(setDeviceTemplatePlan); break; + case AUTO_CREATE_DEVICE_MNODE: + AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan; + autoCreateDeviceMNode(autoCreateDeviceMNodePlan); + break; default: logger.error("Unrecognizable command {}", plan.getOperatorType()); } @@ -1257,7 +1262,8 @@ public class MManager { * @param path path */ public Pair<MNode, Template> getDeviceNodeWithAutoCreate( - PartialPath path, boolean autoCreateSchema, int sgLevel) throws MetadataException { + PartialPath path, boolean autoCreateSchema, int sgLevel) + throws IOException, MetadataException { Pair<MNode, Template> node; boolean shouldSetStorageGroup; try { @@ -1276,17 +1282,23 @@ public class MManager { setStorageGroup(storageGroupPath); } node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel); + if (!(node.left instanceof StorageGroupMNode)) { + logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.left.getPartialPath())); + } return node; } catch (StorageGroupAlreadySetException e) { // ignore set storage group concurrently node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel); + if (!(node.left instanceof StorageGroupMNode)) { + logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.left.getPartialPath())); + } return node; } } /** !!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. */ public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path) - throws MetadataException { + throws MetadataException, IOException { return getDeviceNodeWithAutoCreate( path, config.isAutoCreateSchemaEnabled(), config.getDefaultStorageGroupLevel()); } @@ -2022,8 +2034,8 @@ public class MManager { /** get schema for device. Attention!!! Only support insertPlan */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException { - + public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) + throws MetadataException, IOException { PartialPath deviceId = plan.getDeviceId(); String[] measurementList = plan.getMeasurements(); MeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); @@ -2312,4 +2324,8 @@ public class MManager { // current template must contains all measurements of upper template return upperMap.isEmpty(); } + + public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException { + mtree.getDeviceNodeWithAutoCreating(plan.getPath(), config.getDefaultStorageGroupLevel()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java index bef40ef..deb3272 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java @@ -28,6 +28,7 @@ public class MetadataOperationType { public static final String DELETE_TIMESERIES = "1"; public static final String SET_STORAGE_GROUP = "2"; public static final String CREATE_ALIGNED_TIMESERIES = "3"; + public static final String AUTO_CREATE_DEVICE_MNODE = "4"; public static final String SET_TTL = "10"; public static final String DELETE_STORAGE_GROUP = "11"; public static final String CREATE_INDEX = "31"; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java index 0c4b9ce..3f2eb45 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan; import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan; import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; @@ -167,6 +168,10 @@ public class MLogWriter implements AutoCloseable { putLog(plan); } + public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws IOException { + putLog(plan); + } + public void serializeMNode(MNode node) throws IOException { int childSize = 0; if (node.getChildren() != null) { @@ -366,6 +371,7 @@ public class MLogWriter implements AutoCloseable { createTimeseries(plan); break; case MetadataOperationType.CREATE_ALIGNED_TIMESERIES: + case MetadataOperationType.AUTO_CREATE_DEVICE_MNODE: throw new MetadataException("Impossible operation!"); case MetadataOperationType.DELETE_TIMESERIES: if (args.length > 2) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index f272bf0..7cbf4b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1018,7 +1018,7 @@ public class PlanExecutor implements IPlanExecutor { List<ChunkGroupMetadata> chunkGroupMetadataList, Map<Path, IMeasurementSchema> knownSchemas, int sgLevel) - throws QueryProcessException, MetadataException { + throws QueryProcessException, MetadataException, IOException { if (chunkGroupMetadataList.isEmpty()) { return; } @@ -1119,7 +1119,11 @@ public class PlanExecutor implements IPlanExecutor { } protected MNode getSeriesSchemas(InsertPlan insertPlan) throws MetadataException { - return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan); + try { + return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan); + } catch (IOException e) { + throw new MetadataException(e); + } } private void checkFailedMeasurments(InsertPlan plan) @@ -1200,7 +1204,7 @@ public class PlanExecutor implements IPlanExecutor { throw new StorageEngineException( "failed to insert points " + failedMeasurements - + (exception != null ? (" caused by " + exception.getMessage()) : "")); + + (" caused by " + exception.getMessage())); } } catch (StorageEngineException | MetadataException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index bc2a902..7b3658b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -136,6 +136,7 @@ public abstract class Operator { DROP_FUNCTION, CREATE_ALIGNED_TIMESERIES, CREATE_MULTI_TIMESERIES, + AUTO_CREATE_DEVICE_MNODE, CREATE_INDEX, DROP_INDEX, QUERY_INDEX, diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index 9be8654..d5b1860 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan; import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; @@ -373,6 +374,10 @@ public abstract class PhysicalPlan { plan = new SetDeviceTemplatePlan(); plan.deserialize(buffer); break; + case AUTO_CREATE_DEVICE_MNODE: + plan = new AutoCreateDeviceMNodePlan(); + plan.deserialize(buffer); + break; default: throw new IOException("unrecognized log type " + type); } @@ -421,6 +426,7 @@ public abstract class PhysicalPlan { SHOW_DEVICES, CREATE_TEMPLATE, SET_DEVICE_TEMPLATE, + AUTO_CREATE_DEVICE_MNODE, } public long getIndex() { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java new file mode 100644 index 0000000..ef7412e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.physical.sys; + +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class AutoCreateDeviceMNodePlan extends PhysicalPlan { + + private static final Logger logger = LoggerFactory.getLogger(AutoCreateDeviceMNodePlan.class); + protected PartialPath path; + + public AutoCreateDeviceMNodePlan() { + super(false, Operator.OperatorType.AUTO_CREATE_DEVICE_MNODE); + } + + public AutoCreateDeviceMNodePlan(PartialPath path) { + super(false, Operator.OperatorType.AUTO_CREATE_DEVICE_MNODE); + this.path = path; + } + + public AutoCreateDeviceMNodePlan(boolean isQuery, Operator.OperatorType operatorType) { + super(isQuery, operatorType); + } + + @Override + public List<PartialPath> getPaths() { + return Collections.singletonList(path); + } + + public PartialPath getPath() { + return path; + } + + @Override + public void serialize(ByteBuffer buffer) { + buffer.put((byte) PhysicalPlanType.AUTO_CREATE_DEVICE_MNODE.ordinal()); + putString(buffer, path.getFullPath()); + buffer.putLong(index); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.write((byte) PhysicalPlanType.AUTO_CREATE_DEVICE_MNODE.ordinal()); + putString(stream, path.getFullPath()); + stream.writeLong(index); + } + + @Override + public void deserialize(ByteBuffer buffer) { + String pathString = readString(buffer); + try { + path = new PartialPath(pathString); + } catch (IllegalPathException e) { + logger.error("Failed to deserialize device {} from buffer", pathString); + } + index = buffer.getLong(); + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java index 66e88df..4b52b54 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -136,12 +137,16 @@ public class MManagerImproveTest { } private void doCacheTest(String deviceId, List<String> measurementList) throws MetadataException { - MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId)).left; - for (String s : measurementList) { - assertTrue(node.hasChild(s)); - MeasurementMNode measurementNode = (MeasurementMNode) node.getChild(s); - TSDataType dataType = measurementNode.getSchema().getType(); - assertEquals(TSDataType.TEXT, dataType); + try { + MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId)).left; + for (String s : measurementList) { + assertTrue(node.hasChild(s)); + MeasurementMNode measurementNode = (MeasurementMNode) node.getChild(s); + TSDataType dataType = measurementNode.getSchema().getType(); + assertEquals(TSDataType.TEXT, dataType); + } + } catch (IOException e) { + throw new MetadataException(e); } } diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java index 2b93f36..4bed8ef 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java @@ -145,6 +145,28 @@ public class InsertTabletPlanTest { public void testInsertTabletPlanWithDeviceTemplate() throws QueryProcessException, MetadataException, InterruptedException, QueryFilterOptimizationException, StorageEngineException, IOException { + CreateTemplatePlan plan = getCreateTemplatePlan(); + + IoTDB.metaManager.createDeviceTemplate(plan); + IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp.d1")); + + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + + InsertTabletPlan tabletPlan = getInsertTabletPlan(); + + PlanExecutor executor = new PlanExecutor(); + executor.insertTablet(tabletPlan); + + QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1"); + QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); + Assert.assertEquals(3, dataSet.getPaths().size()); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + Assert.assertEquals(6, record.getFields().size()); + } + } + + private CreateTemplatePlan getCreateTemplatePlan() { List<List<String>> measurementList = new ArrayList<>(); List<String> v1 = new ArrayList<>(); v1.add("s1"); @@ -186,22 +208,23 @@ public class InsertTabletPlanTest { compressionTypes.add(CompressionType.SNAPPY); } - CreateTemplatePlan plan = - new CreateTemplatePlan( - "template1", measurementList, dataTypesList, encodingList, compressionTypes); - - IoTDB.metaManager.createDeviceTemplate(plan); - IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp.d1")); + return new CreateTemplatePlan( + "template1", measurementList, dataTypesList, encodingList, compressionTypes); + } - IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + @Test + public void testInsertTabletPlanWithDeviceTemplateAndAutoCreateSchema() + throws QueryProcessException, MetadataException, InterruptedException, + QueryFilterOptimizationException, StorageEngineException, IOException { + CreateTemplatePlan plan = getCreateTemplatePlan(); + IoTDB.metaManager.createDeviceTemplate(plan); + IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp")); InsertTabletPlan tabletPlan = getInsertTabletPlan(); PlanExecutor executor = new PlanExecutor(); executor.insertTablet(tabletPlan); - System.out.println(Arrays.toString(tabletPlan.getMeasurementMNodes())); - QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1"); QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); Assert.assertEquals(3, dataSet.getPaths().size()); @@ -209,6 +232,26 @@ public class InsertTabletPlanTest { RowRecord record = dataSet.next(); Assert.assertEquals(6, record.getFields().size()); } + + // test recover + // we want to recover + EnvironmentUtils.stopDaemon(); + // wait for close + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } + EnvironmentUtils.activeDaemon(); + + queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1"); + dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT); + Assert.assertEquals(3, dataSet.getPaths().size()); + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + Assert.assertEquals(6, record.getFields().size()); + } } @Test
