This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch f_index_dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/f_index_dev by this push:
new 090d398 完成:展示实验
090d398 is described below
commit 090d39804a77ab0d6fa6ea6d10e5b7c881230818
Author: kr11 <3095717866.com>
AuthorDate: Fri May 14 04:39:29 2021 +0800
完成:展示实验
---
.../iotdb/db/index/algorithm/RTreeIndex.java | 51 ++++++-
.../iotdb/db/index/algorithm/mmhh/MMHHIndex.java | 78 ++++++++++-
.../org/apache/iotdb/db/index/IndexTestUtils.java | 2 +-
.../it/{ELBWindIT.java => DemoELBWindIT.java} | 37 +++--
.../it/{ELBWindIT.java => DemoMMHHWindIT.java} | 156 +++++++++++++++------
.../it/{ELBWindIT.java => DemoRTreeWindIT.java} | 132 +++++++++++------
.../apache/zeppelin/iotdb/IoTDBInterpreter.java | 42 ++++++
.../zeppelin/iotdb/IoTDBInterpreterTest.java | 52 +++++++
8 files changed, 446 insertions(+), 104 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
index a9f0ac5..653b1c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/algorithm/RTreeIndex.java
@@ -61,9 +61,11 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -244,6 +246,8 @@ public abstract class RTreeIndex extends IoTDBIndex {
currentInsertPath = null;
}
+ private Set<PartialPath> involvedPathSet = new HashSet<>();
+
@Override
public boolean buildNext() {
fillCurrentFeature();
@@ -252,6 +256,7 @@ public abstract class RTreeIndex extends IoTDBIndex {
} else {
rTree.insert(currentLowerBounds, currentUpperBounds, currentInsertPath);
}
+ involvedPathSet.add(currentInsertPath);
return true;
}
@@ -511,7 +516,18 @@ public abstract class RTreeIndex extends IoTDBIndex {
if (props.containsKey(NO_PRUNE)) {
return noPruneQuery(queryProps, iIndexUsable, context,
candidateOrderOptimize, alignedByTime);
}
+ List<PartialPath> uninvolvedList;
+ try {
+ uninvolvedList =
MManager.getInstance().getAllTimeseriesPath(indexSeries);
+ uninvolvedList.removeAll(involvedPathSet);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new QueryIndexException(e.getMessage());
+ }
+ Function<PartialPath, TVList> loadSeriesFunc =
+ getLoadSeriesFunc(context, tsDataType, createQueryFeatureExtractor());
+ BiFunction<double[], TVList, Double> exactDistFunc =
getCalcExactDistFunc();
RTreeQueryStruct struct = initQuery(queryProps);
List<DistSeries> res;
res =
@@ -521,8 +537,39 @@ public abstract class RTreeIndex extends IoTDBIndex {
struct.patternFeatures,
((WholeMatchIndexUsability) iIndexUsable).getUnusableRange(),
getCalcLowerDistFunc(),
- getCalcExactDistFunc(),
- getLoadSeriesFunc(context, tsDataType,
createQueryFeatureExtractor()));
+ exactDistFunc,
+ loadSeriesFunc);
+
+ if (!uninvolvedList.isEmpty()) {
+ PriorityQueue<DistSeries> topKPQ =
+ new PriorityQueue<>(struct.topK, new DistSeriesComparator());
+ topKPQ.addAll(res);
+ double kthMinDist = topKPQ.isEmpty() ? Double.MAX_VALUE :
topKPQ.peek().dist;
+ for (PartialPath path : uninvolvedList) {
+ TVList tvList = loadSeriesFunc.apply(path);
+ double tempDist = exactDistFunc.apply(struct.patterns, tvList);
+ if (topKPQ.size() < struct.topK || tempDist < kthMinDist) {
+ if (topKPQ.size() == struct.topK) {
+ topKPQ.poll();
+ }
+ topKPQ.add(new DistSeries(tempDist, tvList, path));
+ kthMinDist = topKPQ.peek().dist;
+ }
+ }
+ if (topKPQ.isEmpty()) {
+ res = Collections.emptyList();
+ } else {
+ int retSize = Math.min(struct.topK, topKPQ.size());
+ DistSeries[] resArray = new DistSeries[retSize];
+ int idx = retSize - 1;
+ while (!topKPQ.isEmpty()) {
+ DistSeries distSeries = topKPQ.poll();
+ resArray[idx--] = distSeries;
+ }
+ res = Arrays.asList(resArray);
+ }
+ }
+
for (DistSeries ds : res) {
ds.partialPath = ds.partialPath.concatNode(String.format("(D=%.2f)",
ds.dist));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
index 101cf2a..8367b95 100644
---
a/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/index/algorithm/mmhh/MMHHIndex.java
@@ -23,7 +23,6 @@ import
org.apache.iotdb.db.exception.index.IndexManagerException;
import org.apache.iotdb.db.exception.index.QueryIndexException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.index.algorithm.IoTDBIndex;
-import org.apache.iotdb.db.index.algorithm.RTreeIndex;
import org.apache.iotdb.db.index.algorithm.rtree.RTree.DistSeriesComparator;
import org.apache.iotdb.db.index.common.DistSeries;
import org.apache.iotdb.db.index.common.IndexInfo;
@@ -33,6 +32,7 @@ import
org.apache.iotdb.db.index.feature.IndexFeatureExtractor;
import org.apache.iotdb.db.index.read.optimize.IIndexCandidateOrderOptimize;
import org.apache.iotdb.db.index.stats.IndexStatManager;
import org.apache.iotdb.db.index.usable.IIndexUsable;
+import org.apache.iotdb.db.index.usable.WholeMatchIndexUsability;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -60,12 +60,16 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.function.BiFunction;
import java.util.function.Function;
+import static org.apache.iotdb.db.index.algorithm.RTreeIndex.getLoadSeriesFunc;
import static
org.apache.iotdb.db.index.common.IndexConstant.DEFAULT_HASH_LENGTH;
import static
org.apache.iotdb.db.index.common.IndexConstant.DEFAULT_SERIES_LENGTH;
import static org.apache.iotdb.db.index.common.IndexConstant.HASH_LENGTH;
@@ -147,6 +151,8 @@ public class MMHHIndex extends IoTDBIndex {
currentInsertPath = null;
}
+ private Set<PartialPath> involvedPathSet = new HashSet<>();
+
@Override
public boolean buildNext() throws IndexManagerException {
Long key = mmhhFeatureExtractor.getCurrent_L3_Feature();
@@ -159,6 +165,7 @@ public class MMHHIndex extends IoTDBIndex {
// .format("Input record: %s, pathId %d, hash %d, series: %s",
currentInsertPath, pathId,
// key,
// mmhhFeatureExtractor.getCurrent_L2_AlignedSequence()));
+ involvedPathSet.add(currentInsertPath);
return true;
}
@@ -259,7 +266,7 @@ public class MMHHIndex extends IoTDBIndex {
List<DistSeries> res;
Function<PartialPath, TVList> loadSeriesFunc =
- RTreeIndex.getLoadSeriesFunc(context, tsDataType,
mmhhFeatureExtractor);
+ getLoadSeriesFunc(context, tsDataType, mmhhFeatureExtractor);
List<PartialPath> paths;
try {
Pair<List<PartialPath>, Integer> pathsPair =
@@ -332,6 +339,17 @@ public class MMHHIndex extends IoTDBIndex {
return constructSearchDataset(res, alignedByTime);
}
+ private BiFunction<double[], TVList, Double> getCalcExactDistFunc() {
+ return (queryTs, tvList) -> {
+ double sum = 0;
+ for (int i = 0; i < queryTs.length; i++) {
+ final double dp = queryTs[i] - IndexUtils.getDoubleFromAnyType(tvList,
i);
+ sum += dp * dp;
+ }
+ return Math.sqrt(sum);
+ };
+ }
+
@Override
public QueryDataSet query(
Map<String, Object> queryProps,
@@ -344,14 +362,52 @@ public class MMHHIndex extends IoTDBIndex {
return noPruneQuery(queryProps, iIndexUsable, context,
candidateOrderOptimize, alignedByTime);
}
+ List<PartialPath> uninvolvedList;
+ try {
+ uninvolvedList =
MManager.getInstance().getAllTimeseriesPath(indexSeries);
+ uninvolvedList.removeAll(involvedPathSet);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new QueryIndexException(e.getMessage());
+ }
+ uninvolvedList.addAll(((WholeMatchIndexUsability)
iIndexUsable).getUnusableRange());
+
MMHHQueryStruct struct = initQuery(queryProps);
this.tempQueryStruct = struct;
long featureStart = System.nanoTime();
Long queryCode = mmhhFeatureExtractor.processQuery(struct.patterns);
IndexStatManager.featureExtractCost += System.nanoTime() - featureStart;
List<DistSeries> res = hammingSearch(queryCode, struct.topK, context);
+
+ if (!uninvolvedList.isEmpty()) {
+ Function<PartialPath, TVList> loadSeriesFunc =
+ getLoadSeriesFunc(context, tsDataType, mmhhFeatureExtractor);
+ BiFunction<double[], TVList, Double> exactDistFunc =
getCalcExactDistFunc();
+ //
+ // PriorityQueue<DistSeries> topKPQ = new
PriorityQueue<>(struct.topK,
+ // new DistSeriesComparator());
+ // topKPQ.addAll(res);
+ // double kthMinDist = topKPQ.isEmpty() ? Double.MAX_VALUE :
topKPQ.peek().dist;
+ // for (PartialPath path : uninvolvedList) {
+ // TVList tvList = loadSeriesFunc.apply(path);
+ // double tempDist = exactDistFunc.apply(struct.patterns, tvList);
+ // if (topKPQ.size() < struct.topK || tempDist < kthMinDist) {
+ // if (topKPQ.size() == struct.topK) {
+ // topKPQ.poll();
+ // }
+ // topKPQ.add(new DistSeries(tempDist, tvList, path));
+ // kthMinDist = topKPQ.peek().dist;
+ // }
+ // }
+ for (PartialPath path : uninvolvedList) {
+ TVList rawData = loadSeriesFunc.apply(path);
+ res.add(new DistSeries(0, rawData, path));
+ }
+ sortByEd(res);
+ }
+
for (DistSeries ds : res) {
- ds.partialPath = ds.partialPath.concatNode(String.format("(D=%.2f)",
ds.dist));
+ ds.partialPath = ds.partialPath.concatNode(String.format("(D_Ham=%d)",
(int) ds.dist));
}
return constructSearchDataset(res, alignedByTime);
}
@@ -361,7 +417,21 @@ public class MMHHIndex extends IoTDBIndex {
// Long.toBinaryString(queryCode)));
List<DistSeries> res = new ArrayList<>();
Function<PartialPath, TVList> loadRaw =
- RTreeIndex.getLoadSeriesFunc(context, tsDataType,
mmhhFeatureExtractor);
+ getLoadSeriesFunc(context, tsDataType, mmhhFeatureExtractor);
+
+ // it's a hyper-parameter. If itemSize/topK < lambda, scan the list
instead of hamming search
+ float lambda = 3;
+ if (itemSize <= topK || (float) itemSize / topK <= lambda) {
+ // scan
+ hashLookupTable.forEach(
+ (k, v) -> {
+ for (Long seriesId : v) {
+ readRawData(0, seriesId, loadRaw);
+ }
+ });
+ sortByEd(res);
+ return res;
+ }
for (int radius = 0; radius <= hashLength; radius++) {
boolean full = scanBucket(queryCode, 0, radius, 0, topK, loadRaw, res);
if (full) {
diff --git a/server/src/test/java/org/apache/iotdb/db/index/IndexTestUtils.java
b/server/src/test/java/org/apache/iotdb/db/index/IndexTestUtils.java
index 0f37b63..62c9ce1 100644
--- a/server/src/test/java/org/apache/iotdb/db/index/IndexTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/index/IndexTestUtils.java
@@ -70,7 +70,7 @@ public class IndexTestUtils {
public static String getStringFromList(List<Float> list, int start, int end)
{
StringBuilder array = new StringBuilder();
for (int i = start; i < end; i++) {
- array.append(list.get(i)).append(',');
+ array.append(String.format("%.3f", list.get(i))).append(',');
}
array.deleteCharAt(array.length() - 1);
return array.toString();
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
b/server/src/test/java/org/apache/iotdb/db/index/it/DemoELBWindIT.java
similarity index 90%
copy from server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
copy to server/src/test/java/org/apache/iotdb/db/index/it/DemoELBWindIT.java
index 35daf04..e6c9df8 100644
--- a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/DemoELBWindIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -37,6 +38,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -44,9 +46,11 @@ import static
org.apache.iotdb.db.index.IndexTestUtils.getStringFromList;
import static org.apache.iotdb.db.index.common.IndexType.ELB_INDEX;
import static org.junit.Assert.fail;
-public class ELBWindIT {
+public class DemoELBWindIT {
private static final String insertPattern = "INSERT INTO %s(timestamp, %s)
VALUES (%d, %.3f)";
+ private static final String insertPattern_show =
+ "INSERT INTO %s(timestamp, %s) VALUES (%s, %.3f)";
private static final String storageGroupSub = "root.wind1";
private static final String storageGroupWhole = "root.wind2";
@@ -118,8 +122,8 @@ public class ELBWindIT {
long t = Long.parseLong(data[0]);
float v = Float.parseFloat(data[1]);
- // subInput.putDouble(t, v);
- subInput.putFloat(idx, v);
+ subInput.putFloat(t, v);
+ // subInput.putFloat(idx, v);
}
idx++;
}
@@ -129,20 +133,27 @@ public class ELBWindIT {
// subInput.putDouble(i, i * 10);
// }
for (int i = 0; i < subInput.size(); i++) {
- statement.execute(
- String.format(
- insertPattern,
- speed1Device,
- speed1Sensor,
- subInput.getTime(i),
- subInput.getFloat(i)));
- // System.out.println(
+ // statement.execute(
// String.format(
// insertPattern,
// speed1Device,
// speed1Sensor,
// subInput.getTime(i),
// subInput.getFloat(i)));
+
+ String insertSQL =
+ String.format(
+ insertPattern_show,
+ speed1Device,
+ speed1Sensor,
+ RpcUtils.formatDatetime(
+ "iso8601",
+ RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
+ subInput.getTime(i),
+ ZoneId.systemDefault()),
+ subInput.getFloat(i));
+ statement.execute(insertSQL);
+ System.out.println(insertSQL);
}
statement.execute("flush");
// System.out.println("==========================");
@@ -156,7 +167,7 @@ public class ELBWindIT {
@Test
public void checkReadWithoutCreateTS() throws ClassNotFoundException {
- checkRead(false);
+ checkRead(true);
}
private void checkRead(boolean createTS) throws ClassNotFoundException {
@@ -177,7 +188,7 @@ public class ELBWindIT {
String querySQL =
"SELECT speed.* FROM root.wind1.azq01 WHERE Speed "
+ String.format("CONTAIN (%s) WITH TOLERANCE 10 ",
getStringFromList(pattern, 0, 30))
- + String.format("CONCAT (%s) WITH TOLERANCE 25 ",
getStringFromList(pattern, 30, 70))
+ + String.format("CONCAT (%s) WITH TOLERANCE 20 ",
getStringFromList(pattern, 30, 70))
+ String.format(
"CONCAT (%s) WITH TOLERANCE 10 ", getStringFromList(pattern,
70, 100));
System.out.println(querySQL);
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
b/server/src/test/java/org/apache/iotdb/db/index/it/DemoMMHHWindIT.java
similarity index 54%
copy from server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
copy to server/src/test/java/org/apache/iotdb/db/index/it/DemoMMHHWindIT.java
index 35daf04..a92a481 100644
--- a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/DemoMMHHWindIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -31,22 +32,27 @@ import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
+import java.io.File;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import static org.apache.iotdb.db.index.IndexTestUtils.getStringFromList;
-import static org.apache.iotdb.db.index.common.IndexType.ELB_INDEX;
+import static org.apache.iotdb.db.index.common.IndexConstant.MODEL_PATH;
+import static org.apache.iotdb.db.index.common.IndexType.MMHH;
import static org.junit.Assert.fail;
-public class ELBWindIT {
+public class DemoMMHHWindIT {
private static final String insertPattern = "INSERT INTO %s(timestamp, %s)
VALUES (%d, %.3f)";
+ private static final String insertPattern_show =
+ "INSERT INTO %s(timestamp, %s) VALUES (%s, %.3f)";
private static final String storageGroupSub = "root.wind1";
private static final String storageGroupWhole = "root.wind2";
@@ -55,15 +61,16 @@ public class ELBWindIT {
private static final String speed1Device = "root.wind1.azq01";
private static final String speed1Sensor = "speed";
- private static final String directionDevicePattern = "root.wind2.%d";
- private static final String directionPattern = "root.wind2.%d.direction";
- private static final String directionSensor = "direction";
+ private static final String speedDevicePattern = "root.wind2.%d";
+ private static final String speedPattern = "root.wind2.%d.speed";
+ private static final String speedSensor = "speed";
private static final String indexSub = speed1;
- private static final String indexWhole = "root.wind2.*.direction";
- private static final int wholeSize = 5;
+ private static final String indexWhole = "root.wind2.*.speed";
+ private static final int wholeSize = 100;
private static final int wholeDim = 100;
- private static final int subLength = 50;
+ private static final int PAA_Dim = 4;
+ private static final int hashLength = 48;
@Before
public void setUp() throws Exception {
@@ -94,17 +101,39 @@ public class ELBWindIT {
System.out.println(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
for (int i = 0; i < wholeSize; i++) {
- String wholePath = String.format(directionPattern, i);
+ String wholePath = String.format(speedPattern, i);
statement.execute(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
System.out.println(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
}
}
- statement.execute(
- String.format("CREATE INDEX ON %s WITH INDEX=%s, BLOCK_SIZE=%d",
indexSub, ELB_INDEX, 5));
System.out.println(
- String.format("CREATE INDEX ON %s WITH INDEX=%s, BLOCK_SIZE=%d",
indexSub, ELB_INDEX, 5));
+ String.format(
+ "CREATE INDEX ON %s WITH INDEX=%s, SERIES_LENGTH=%d,
HASH_LENGTH=%d, %s=%s",
+ indexWhole,
+ MMHH,
+ wholeDim,
+ hashLength,
+ MODEL_PATH,
+ //
+ //
"\"/Users/kangrong/code/github/deep-learning/hash_journal/TAH_project/src/mmhh.pt\""
+ "\"src/test/resources/index/mmhh.pt\""));
+ // String modelPath =
+ //
"/Users/kangrong/code/github/deep-learning/hash_journal/TAH_project/src/mmhh.pt";
+ String modelPath = "src/test/resources/index/mmhh.pt";
+ if (!(new File(modelPath).exists())) {
+ fail("model file path is not correct!");
+ }
+ statement.execute(
+ String.format(
+ "CREATE INDEX ON %s WITH INDEX=%s, SERIES_LENGTH=%d,
HASH_LENGTH=%d, %s=%s",
+ indexWhole,
+ MMHH,
+ wholeDim,
+ hashLength,
+ MODEL_PATH,
+ String.format("\"%s\"", modelPath)));
TVList subInput =
TVListAllocator.getInstance().allocate(TSDataType.FLOAT);
// read base
@@ -118,8 +147,8 @@ public class ELBWindIT {
long t = Long.parseLong(data[0]);
float v = Float.parseFloat(data[1]);
- // subInput.putDouble(t, v);
- subInput.putFloat(idx, v);
+ subInput.putFloat(t, v);
+ // subInput.putFloat(idx, v);
}
idx++;
}
@@ -128,23 +157,30 @@ public class ELBWindIT {
// for (int i = 0; i < subLength; i++) {
// subInput.putDouble(i, i * 10);
// }
- for (int i = 0; i < subInput.size(); i++) {
- statement.execute(
- String.format(
- insertPattern,
- speed1Device,
- speed1Sensor,
- subInput.getTime(i),
- subInput.getFloat(i)));
- // System.out.println(
- // String.format(
- // insertPattern,
- // speed1Device,
- // speed1Sensor,
- // subInput.getTime(i),
- // subInput.getFloat(i)));
+ assert subInput.size() == wholeDim * wholeSize;
+ for (int i = 0; i < wholeSize; i++) {
+ long startTime = subInput.getTime(i * wholeDim);
+ String device = String.format(speedDevicePattern, startTime);
+ for (int j = 0; j < wholeDim; j++) {
+ String insertSQL =
+ String.format(
+ insertPattern_show,
+ device,
+ speedSensor,
+ RpcUtils.formatDatetime(
+ "iso8601",
+ RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
+ subInput.getTime(i * wholeDim + j),
+ ZoneId.systemDefault()),
+ subInput.getFloat(i * wholeDim + j));
+ // System.out.println(insertSQL);
+ statement.execute(insertSQL);
+ }
}
- statement.execute("flush");
+ // 有flush或没有flush结果可能不同
+ // 有flush,会找桶,桶里的序列放到result list按ed排序,满了直接离开;hamming
dist不一定等于ed,所以可能漏掉一些ed比较小的结果
+ // 但没有flush,会把uninvolved 序列全加入result list再按ed排序,这样可能会有更优的结果
+ // statement.execute("flush");
// System.out.println("==========================");
// System.out.println(IndexManager.getInstance().getRouter());
@@ -165,24 +201,60 @@ public class ELBWindIT {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
Statement statement = connection.createStatement()) {
- String queryPath =
"/Users/kangrong/tsResearch/tols/JINFENG/d2/out_sub_pattern.csv";
- List<Float> pattern = new ArrayList<>();
- try (BufferedReader csvReader = new BufferedReader(new
FileReader(queryPath))) {
+ String q1Line =
+ "20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ + "20.0,20.0,20.0,20.0,20.0,"
+ +
"20.5,21.0,21.5,22.0,22.5,23.0,23.5,24.0,24.5,25.0,25.5,26.0,26.5,27.0,"
+ +
"27.5,28.0,28.5,29.0,29.5,30.0,29.5,29.0,28.5,28.0,27.5,27.0,26.5,26.0,"
+ + "25.5,25.0,24.5,24.0,23.5,23.0,22.5,22.0,21.5,21.0,20.5,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0";
+ String querySQL =
+ String.format("SELECT TOP 3 speed FROM root.wind2.* WHERE speed LIKE
(%s)", q1Line);
+ System.out.println(querySQL);
+ statement.setQueryTimeout(200);
+ boolean hasIndex = statement.execute(querySQL);
+ // String gt = "Time,root.wind1.azq01.speed.17,\n";
+ Assert.assertTrue(hasIndex);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ sb.append("\n");
+ while (resultSet.next()) {
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSet.getString(i)).append(",");
+ }
+ sb.append("\n");
+ }
+ System.out.println(sb);
+ // Assert.assertEquals(gt, sb.toString());
+ }
+ // ========================================
+ String basePath =
"/Users/kangrong/tsResearch/tols/JINFENG/d2/out_sub_base.csv";
+ List<Float> base = new ArrayList<>();
+ try (BufferedReader csvReader = new BufferedReader(new
FileReader(basePath))) {
String row;
+ int idx = 0;
while ((row = csvReader.readLine()) != null) {
- float v = Float.parseFloat(row);
- pattern.add(v);
+ if (idx >= 1) {
+ String[] data = row.split(",");
+ long t = Long.parseLong(data[0]);
+ float v = Float.parseFloat(data[1]);
+ base.add(v);
+ }
+ idx++;
}
}
- String querySQL =
- "SELECT speed.* FROM root.wind1.azq01 WHERE Speed "
- + String.format("CONTAIN (%s) WITH TOLERANCE 10 ",
getStringFromList(pattern, 0, 30))
- + String.format("CONCAT (%s) WITH TOLERANCE 25 ",
getStringFromList(pattern, 30, 70))
- + String.format(
- "CONCAT (%s) WITH TOLERANCE 10 ", getStringFromList(pattern,
70, 100));
+ String q2Line = getStringFromList(base, 7600, 7600 + 100);
+ querySQL =
+ String.format("SELECT TOP 3 speed FROM root.wind2.* WHERE speed LIKE
(%s)", q2Line);
System.out.println(querySQL);
statement.setQueryTimeout(200);
- boolean hasIndex = statement.execute(querySQL);
+ hasIndex = statement.execute(querySQL);
// String gt = "Time,root.wind1.azq01.speed.17,\n";
Assert.assertTrue(hasIndex);
try (ResultSet resultSet = statement.getResultSet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
b/server/src/test/java/org/apache/iotdb/db/index/it/DemoRTreeWindIT.java
similarity index 59%
rename from server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
rename to server/src/test/java/org/apache/iotdb/db/index/it/DemoRTreeWindIT.java
index 35daf04..1891d56 100644
--- a/server/src/test/java/org/apache/iotdb/db/index/it/ELBWindIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/index/it/DemoRTreeWindIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -37,16 +38,19 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import static org.apache.iotdb.db.index.IndexTestUtils.getStringFromList;
-import static org.apache.iotdb.db.index.common.IndexType.ELB_INDEX;
+import static org.apache.iotdb.db.index.common.IndexType.RTREE_PAA;
import static org.junit.Assert.fail;
-public class ELBWindIT {
+public class DemoRTreeWindIT {
private static final String insertPattern = "INSERT INTO %s(timestamp, %s)
VALUES (%d, %.3f)";
+ private static final String insertPattern_show =
+ "INSERT INTO %s(timestamp, %s) VALUES (%s, %.3f)";
private static final String storageGroupSub = "root.wind1";
private static final String storageGroupWhole = "root.wind2";
@@ -55,15 +59,15 @@ public class ELBWindIT {
private static final String speed1Device = "root.wind1.azq01";
private static final String speed1Sensor = "speed";
- private static final String directionDevicePattern = "root.wind2.%d";
- private static final String directionPattern = "root.wind2.%d.direction";
- private static final String directionSensor = "direction";
+ private static final String speedDevicePattern = "root.wind2.%d";
+ private static final String speedPattern = "root.wind2.%d.speed";
+ private static final String speedSensor = "speed";
private static final String indexSub = speed1;
- private static final String indexWhole = "root.wind2.*.direction";
- private static final int wholeSize = 5;
+ private static final String indexWhole = "root.wind2.*.speed";
+ private static final int wholeSize = 100;
private static final int wholeDim = 100;
- private static final int subLength = 50;
+ private static final int PAA_Dim = 4;
@Before
public void setUp() throws Exception {
@@ -94,17 +98,21 @@ public class ELBWindIT {
System.out.println(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", speed1));
for (int i = 0; i < wholeSize; i++) {
- String wholePath = String.format(directionPattern, i);
+ String wholePath = String.format(speedPattern, i);
statement.execute(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
System.out.println(
String.format("CREATE TIMESERIES %s WITH
DATATYPE=FLOAT,ENCODING=PLAIN", wholePath));
}
}
- statement.execute(
- String.format("CREATE INDEX ON %s WITH INDEX=%s, BLOCK_SIZE=%d",
indexSub, ELB_INDEX, 5));
System.out.println(
- String.format("CREATE INDEX ON %s WITH INDEX=%s, BLOCK_SIZE=%d",
indexSub, ELB_INDEX, 5));
+ String.format(
+ "CREATE INDEX ON %s WITH INDEX=%s, SERIES_LENGTH=%d,
FEATURE_DIM=%d, MAX_ENTRIES=%d, MIN_ENTRIES=%d",
+ indexWhole, RTREE_PAA, wholeDim, PAA_Dim, 10, 2));
+ statement.execute(
+ String.format(
+ "CREATE INDEX ON %s WITH INDEX=%s, SERIES_LENGTH=%d,
FEATURE_DIM=%d, MAX_ENTRIES=%d, MIN_ENTRIES=%d",
+ indexWhole, RTREE_PAA, wholeDim, PAA_Dim, 10, 2));
TVList subInput =
TVListAllocator.getInstance().allocate(TSDataType.FLOAT);
// read base
@@ -118,8 +126,8 @@ public class ELBWindIT {
long t = Long.parseLong(data[0]);
float v = Float.parseFloat(data[1]);
- // subInput.putDouble(t, v);
- subInput.putFloat(idx, v);
+ subInput.putFloat(t, v);
+ // subInput.putFloat(idx, v);
}
idx++;
}
@@ -128,23 +136,27 @@ public class ELBWindIT {
// for (int i = 0; i < subLength; i++) {
// subInput.putDouble(i, i * 10);
// }
- for (int i = 0; i < subInput.size(); i++) {
- statement.execute(
- String.format(
- insertPattern,
- speed1Device,
- speed1Sensor,
- subInput.getTime(i),
- subInput.getFloat(i)));
- // System.out.println(
- // String.format(
- // insertPattern,
- // speed1Device,
- // speed1Sensor,
- // subInput.getTime(i),
- // subInput.getFloat(i)));
+ assert subInput.size() == wholeDim * wholeSize;
+ for (int i = 0; i < wholeSize; i++) {
+ long startTime = subInput.getTime(i * wholeDim);
+ String device = String.format(speedDevicePattern, startTime);
+ for (int j = 0; j < wholeDim; j++) {
+ String insertSQL =
+ String.format(
+ insertPattern_show,
+ device,
+ speedSensor,
+ RpcUtils.formatDatetime(
+ "iso8601",
+ RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
+ subInput.getTime(i * wholeDim + j),
+ ZoneId.systemDefault()),
+ subInput.getFloat(i * wholeDim + j));
+ // System.out.println(insertSQL);
+ statement.execute(insertSQL);
+ }
}
- statement.execute("flush");
+ // statement.execute("flush");
// System.out.println("==========================");
// System.out.println(IndexManager.getInstance().getRouter());
@@ -165,24 +177,60 @@ public class ELBWindIT {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
Statement statement = connection.createStatement()) {
- String queryPath =
"/Users/kangrong/tsResearch/tols/JINFENG/d2/out_sub_pattern.csv";
- List<Float> pattern = new ArrayList<>();
- try (BufferedReader csvReader = new BufferedReader(new
FileReader(queryPath))) {
+ String q1Line =
+ "20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ + "20.0,20.0,20.0,20.0,20.0,"
+ +
"20.5,21.0,21.5,22.0,22.5,23.0,23.5,24.0,24.5,25.0,25.5,26.0,26.5,27.0,"
+ +
"27.5,28.0,28.5,29.0,29.5,30.0,29.5,29.0,28.5,28.0,27.5,27.0,26.5,26.0,"
+ + "25.5,25.0,24.5,24.0,23.5,23.0,22.5,22.0,21.5,21.0,20.5,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,"
+ +
"20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0,20.0";
+ String querySQL =
+ String.format("SELECT TOP 3 speed FROM root.wind2.* WHERE speed LIKE
(%s)", q1Line);
+ System.out.println(querySQL);
+ statement.setQueryTimeout(200);
+ boolean hasIndex = statement.execute(querySQL);
+ // String gt = "Time,root.wind1.azq01.speed.17,\n";
+ Assert.assertTrue(hasIndex);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ sb.append("\n");
+ while (resultSet.next()) {
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSet.getString(i)).append(",");
+ }
+ sb.append("\n");
+ }
+ System.out.println(sb);
+ // Assert.assertEquals(gt, sb.toString());
+ }
+ // ========================================
+ String basePath =
"/Users/kangrong/tsResearch/tols/JINFENG/d2/out_sub_base.csv";
+ List<Float> base = new ArrayList<>();
+ try (BufferedReader csvReader = new BufferedReader(new
FileReader(basePath))) {
String row;
+ int idx = 0;
while ((row = csvReader.readLine()) != null) {
- float v = Float.parseFloat(row);
- pattern.add(v);
+ if (idx >= 1) {
+ String[] data = row.split(",");
+ long t = Long.parseLong(data[0]);
+ float v = Float.parseFloat(data[1]);
+ base.add(v);
+ }
+ idx++;
}
}
- String querySQL =
- "SELECT speed.* FROM root.wind1.azq01 WHERE Speed "
- + String.format("CONTAIN (%s) WITH TOLERANCE 10 ",
getStringFromList(pattern, 0, 30))
- + String.format("CONCAT (%s) WITH TOLERANCE 25 ",
getStringFromList(pattern, 30, 70))
- + String.format(
- "CONCAT (%s) WITH TOLERANCE 10 ", getStringFromList(pattern,
70, 100));
+ String q2Line = getStringFromList(base, 7600, 7600 + 100);
+ querySQL =
+ String.format("SELECT TOP 3 speed FROM root.wind2.* WHERE speed LIKE
(%s)", q2Line);
System.out.println(querySQL);
statement.setQueryTimeout(200);
- boolean hasIndex = statement.execute(querySQL);
+ hasIndex = statement.execute(querySQL);
// String gt = "Time,root.wind1.azq01.speed.17,\n";
Assert.assertTrue(hasIndex);
try (ResultSet resultSet = statement.getResultSet()) {
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
index 0665db0..62a0ac6 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
@@ -34,6 +34,9 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -82,6 +85,8 @@ public class IoTDBInterpreter extends AbstractInterpreter {
private static final String EXIT_COMMAND = "exit";
private static final String HELP = "help";
private static final String IMPORT_CMD = "import";
+ private static final String LOAD = "load";
+ private static final String SELECT_SERIES = "select series";
static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
static final String SET_QUERY_TIMEOUT = "set query_time_timeout";
private static final String SET_MAX_DISPLAY_NUM = "set max_display_num";
@@ -212,6 +217,43 @@ public class IoTDBInterpreter extends AbstractInterpreter {
if (nonSupportCommandSet.contains(specialCmd)) {
return new InterpreterResult(Code.ERROR, "Not supported in Zeppelin: " +
specialCmd);
}
+ if (specialCmd.startsWith(SELECT_SERIES.toLowerCase())) {
+ // no query db. draw and return
+ String line = cmd.substring(SELECT_SERIES.length(), cmd.length() -
1).trim();
+ String[] series = line.substring(1, line.length() - 1).split(",");
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Time").append(TAB).append("Value");
+ stringBuilder.append(NEWLINE);
+ for (int i = 0; i < series.length; i++) {
+
stringBuilder.append(i).append(TAB).append(series[i].trim()).append(NEWLINE);
+ }
+ InterpreterResult interpreterResult = new
InterpreterResult(Code.SUCCESS);
+ interpreterResult.add(Type.TABLE, stringBuilder.toString());
+ return interpreterResult;
+ }
+ if (specialCmd.startsWith(LOAD.toLowerCase())) {
+ String[] values = cmd.split(" ");
+ if (values.length != 2) {
+ return new InterpreterResult(
+ Code.ERROR,
+ String.format("load sql format error, please input like %s
SQL_FILE_PATH", LOAD));
+ }
+ int idx = 0;
+ String filePath = values[1];
+ try (Statement statement = connection.createStatement();
+ BufferedReader csvReader = new BufferedReader(new
FileReader(filePath))) {
+ String sqlLine;
+ while ((sqlLine = csvReader.readLine()) != null) {
+ sqlLine = sqlLine.trim().split(SEMICOLON)[0];
+ statement.execute(sqlLine.trim());
+ idx++;
+ }
+ } catch (IOException | SQLException e) {
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+ return new InterpreterResult(
+ Code.SUCCESS, String.format("Load finished, insert %d lines", idx));
+ }
if (specialCmd.startsWith(SET_TIMESTAMP_DISPLAY)) {
String[] values = cmd.split(EQUAL_SIGN);
if (values.length != 2) {
diff --git
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
index 0d775aa..000b5fc 100644
---
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
+++
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
@@ -21,6 +21,7 @@ package org.apache.zeppelin.iotdb;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@@ -30,6 +31,12 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Properties;
import static
org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_ENABLE_RPC_COMPRESSION;
@@ -391,4 +398,49 @@ public class IoTDBInterpreterTest {
Assert.assertEquals(Code.SUCCESS, actual.code());
Assert.assertEquals(gt, actual.message().get(0).getData());
}
+
+ @Test
+ public void loadSQLInsert() throws SQLException, ClassNotFoundException {
+ String cmd = "LOAD
/Users/kangrong/tsResearch/tols/JINFENG/d2/elb_insert.sql";
+ InterpreterResult actual = interpreter.internalInterpret(cmd, null);
+ Assert.assertNotNull(actual);
+ Assert.assertEquals(Code.SUCCESS, actual.code());
+ Assert.assertEquals("Load finished, insert 9990 lines",
actual.message().get(0).getData());
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root",
"root");
+ Statement statement = connection.createStatement();
+
+ statement.execute("SELECT count(*) from root.wind1.azq01");
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+ sb.append("\n");
+ while (resultSet.next()) {
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ sb.append(resultSet.getString(i)).append(",");
+ }
+ sb.append("\n");
+ }
+ System.out.println(sb);
+ String countGt = "count(root.wind1.azq01.speed),\n" + "9990,\n";
+ Assert.assertEquals(countGt, sb.toString());
+ }
+ }
+
+ @Test
+ public void testSelectSeries() {
+ InterpreterResult actual =
+ interpreter.internalInterpret("SELECT SERIES (1.2,3.2, 18239. ,3.3))",
null);
+ String gt = "user\n" + "root\n" + "user1";
+ System.out.println(actual.message().get(0).getData());
+ Assert.assertNotNull(actual);
+ Assert.assertEquals(Code.SUCCESS, actual.code());
+
+ // Assert.assertEquals(gt, actual.message().get(0).getData());
+ }
}