This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira-2000 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 302241c1cca1b763a85c14510dc3ed1edf1269ef Author: HTHou <[email protected]> AuthorDate: Fri Nov 12 17:22:24 2021 +0800 [IOTDB-2000] DataMigrationExample should migrate data device by device --- .../org/apache/iotdb/DataMigrationExample.java | 107 ++++++++++++++------- 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java index 2959f1e..ee48154 100644 --- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java +++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java @@ -26,6 +26,7 @@ import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; @@ -87,14 +88,30 @@ public class DataMigrationExample { List<Future> futureList = new ArrayList<>(); int count = 0; + int seriesNumInOneTask = 0; + String currentDevice = ""; + List<String> measurementsInCurrentDevice = new ArrayList<>(); + List<TSDataType> dataTypesInCurrentDevice = new ArrayList<>(); while (schemaIter.next()) { - count++; Path currentPath = new Path(schemaIter.getString("timeseries"), true); - Future future = - executorService.submit( - new LoadThread( - count, currentPath, TSDataType.valueOf(schemaIter.getString("dataType")))); - futureList.add(future); + if (!currentDevice.equals(currentPath.getDevice())) { + if (!currentDevice.equals("") || seriesNumInOneTask > 300) { + count++; + Future future = + executorService.submit( + new LoadThread( + count, currentDevice, measurementsInCurrentDevice, dataTypesInCurrentDevice)); + futureList.add(future); + seriesNumInOneTask = 0; + } + seriesNumInOneTask++; + currentDevice = currentPath.getDevice(); + measurementsInCurrentDevice = new ArrayList<>(); + dataTypesInCurrentDevice = new ArrayList<>(); + } + measurementsInCurrentDevice.add(currentPath.getMeasurement()); + dataTypesInCurrentDevice.add(TSDataType.valueOf(schemaIter.getString("dataType"))); + } readerPool.closeResultSet(schemaDataSet); @@ -110,57 +127,73 @@ public class DataMigrationExample { static class LoadThread implements Callable<Void> { String device; - String measurement; - Path series; - TSDataType dataType; + List<String> measurements; + List<TSDataType> dataTypes; Tablet tablet; int i; - public LoadThread(int i, Path series, TSDataType dataType) { + public LoadThread(int i, String device, List<String> measurements, List<TSDataType> dataTypes) { this.i = i; - this.device = series.getDevice(); - this.measurement = series.getMeasurement(); - this.dataType = dataType; - this.series = series; + this.device = device; + this.measurements = measurements; + this.dataTypes = dataTypes; } @Override public Void call() { List<IMeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new UnaryMeasurementSchema(measurement, dataType)); + StringBuffer measurementsString = new StringBuffer(); + for (int i = 0; i < measurements.size(); i++) { + schemaList.add(new UnaryMeasurementSchema(measurements.get(i), dataTypes.get(i))); + measurementsString.append(measurements.get(i)); + if (i != measurements.size() - 1) { + measurementsString.append(", "); + } + } tablet = new Tablet(device, schemaList, 300000); + tablet.bitMaps = new BitMap[schemaList.size()]; + for (int i = 0; i < measurements.size(); i++) { + tablet.bitMaps[i] = new BitMap(tablet.getMaxRowNumber()); + } SessionDataSetWrapper dataSet = null; try { dataSet = readerPool.executeQueryStatement( - String.format("select %s from %s", measurement, device)); + String.format("select %s from %s", measurementsString.toString(), device)); DataIterator dataIter = dataSet.iterator(); while (dataIter.next()) { int row = tablet.rowSize++; tablet.timestamps[row] = dataIter.getLong(1); - switch (dataType) { - case BOOLEAN: - ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2); - break; - case INT32: - ((int[]) tablet.values[0])[row] = dataIter.getInt(2); - break; - case INT64: - ((long[]) tablet.values[0])[row] = dataIter.getLong(2); - break; - case FLOAT: - ((float[]) tablet.values[0])[row] = dataIter.getFloat(2); - break; - case DOUBLE: - ((double[]) tablet.values[0])[row] = dataIter.getDouble(2); - break; - case TEXT: - ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2)); - break; + for (int i = 0; i < measurements.size(); i++) { + if (dataIter.isNull(i + 2)) { + tablet.bitMaps[i].mark((int) row); + } + switch (dataTypes.get(i)) { + case BOOLEAN: + ((boolean[]) tablet.values[i])[row] = dataIter.getBoolean(i + 2); + break; + case INT32: + ((int[]) tablet.values[i])[row] = dataIter.getInt(i + 2); + break; + case INT64: + ((long[]) tablet.values[i])[row] = dataIter.getLong(i + 2); + break; + case FLOAT: + ((float[]) tablet.values[i])[row] = dataIter.getFloat(i + 2); + break; + case DOUBLE: + ((double[]) tablet.values[i])[row] = dataIter.getDouble(i + 2); + break; + case TEXT: + ((Binary[]) tablet.values[i])[row] = new Binary(dataIter.getString(i + 2)); + break; + default: + break; + } } if (tablet.rowSize == tablet.getMaxRowNumber()) { writerPool.insertTablet(tablet, true); @@ -174,13 +207,13 @@ public class DataMigrationExample { } catch (Exception e) { System.out.println( - "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage()); + "Loading the " + i + "-th devices: " + device + " failed " + e.getMessage()); return null; } finally { readerPool.closeResultSet(dataSet); } - System.out.println("Loading the " + i + "-th timeseries: " + series + " success"); + System.out.println("Loading the " + i + "-th devices: " + device + " success"); return null; } }
