This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch TOP-K-DTW
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TOP-K-DTW by this push:
new 90edc971078 Create DTWSessionPool.java
90edc971078 is described below
commit 90edc971078fa7c0168171a5e7e4a12b2247a65b
Author: YongzaoDan <[email protected]>
AuthorDate: Sat Jun 3 14:21:57 2023 +0800
Create DTWSessionPool.java
---
.../main/java/org/apache/iotdb/DTWSessionPool.java | 168 +++++++++++++++++++++
1 file changed, 168 insertions(+)
diff --git a/example/session/src/main/java/org/apache/iotdb/DTWSessionPool.java
b/example/session/src/main/java/org/apache/iotdb/DTWSessionPool.java
new file mode 100644
index 00000000000..0ece8e54159
--- /dev/null
+++ b/example/session/src/main/java/org/apache/iotdb/DTWSessionPool.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class DTWSessionPool {
+
+ private static final String DATABASE = "root.database";
+ private static final String S = "root.database.device.s";
+ private static final String P = "root.database.device.p";
+
+ private static final int BATCH_SIZE = 32768;
+ private static final long TIMESTAMP_STRIDE = 1000L;
+ private static final long DATA_START_TIME = 954475200000L;
+
+ private static final int PATTERN_COUNT = 100;
+ private static final int PATTERN_LENGTH = 20;
+ private static final double PATTERN_RANGE = 100.0;
+
+ private static final int SERIES_LENGTH = 10000;
+ private static final double SERIES_RANGE = 1000.0;
+
+ private static final String SQL =
+ String.format(
+ "select top_k_dtw_sliding_window(s, p, 'k'='%d') from
root.database.device;",
+ PATTERN_COUNT);
+
+ private static SessionPool sessionPool;
+
+ private static void constructSessionPool() {
+ sessionPool =
+ new SessionPool.Builder()
+ .host("127.0.0.1")
+ .port(6667)
+ .user("root")
+ .password("root")
+ .maxSize(3)
+ .build();
+ }
+
+ private static void registerSchema() {
+ try {
+ sessionPool.deleteDatabase(DATABASE);
+ System.out.println("Delete database successfully");
+ } catch (Exception e) {
+ // ignore
+ }
+ try {
+ sessionPool.createDatabase(DATABASE);
+ sessionPool.createTimeseries(
+ S, TSDataType.DOUBLE, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED);
+ sessionPool.createTimeseries(
+ P, TSDataType.DOUBLE, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED);
+ System.out.println("Create timeseries successfully");
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ private static void insertRecordByTablet()
+ throws IoTDBConnectionException, StatementExecutionException {
+ Random random = new Random();
+ List<MeasurementSchema> sList =
+ Collections.singletonList(new MeasurementSchema("s",
TSDataType.DOUBLE));
+ List<MeasurementSchema> pList =
+ Collections.singletonList(new MeasurementSchema("p",
TSDataType.DOUBLE));
+
+ double[] pattern = new double[PATTERN_LENGTH];
+ Tablet tablet = new Tablet("root.database.device", pList, BATCH_SIZE);
+ for (int i = 0; i < PATTERN_LENGTH; i++) {
+ // each point is a random double in [-100, 100]
+ pattern[i] = PATTERN_RANGE * (2.0 * random.nextDouble() - 1.0);
+ tablet.addTimestamp(i, i * TIMESTAMP_STRIDE);
+ tablet.addValue("p", i, pattern[i]);
+ }
+ tablet.rowSize = PATTERN_LENGTH;
+ sessionPool.insertTablet(tablet);
+ System.out.println("Insert pattern successfully");
+
+ long currentTime = DATA_START_TIME;
+ for (int i = 0; i < PATTERN_COUNT; i++) {
+ int rowCount = 0;
+ tablet = new Tablet("root.database.device", sList, BATCH_SIZE);
+ // each point is a random double in [-1000, 1000]
+ for (int j = 0; j < SERIES_LENGTH; j++) {
+ tablet.addTimestamp(rowCount, currentTime);
+ tablet.addValue("s", rowCount, SERIES_RANGE * (2.0 *
random.nextDouble() - 1.0));
+ currentTime += TIMESTAMP_STRIDE;
+ rowCount++;
+ }
+
+ for (int j = 0; j < PATTERN_LENGTH; j++) {
+ // Construct a similar pattern
+ int repeatTime = (int) Math.abs(random.nextGaussian()) + 1;
+ if (j == 0 || j == PATTERN_LENGTH - 1) {
+ // Make sure the first and last points are the same
+ repeatTime = 1;
+ }
+ for (int k = 0; k < repeatTime; k++) {
+ tablet.addTimestamp(rowCount, currentTime);
+ tablet.addValue("s", rowCount, pattern[j]);
+ currentTime += TIMESTAMP_STRIDE;
+ rowCount++;
+ }
+ }
+
+ tablet.rowSize = rowCount;
+ sessionPool.insertTablet(tablet);
+ }
+ System.out.println("Insert series successfully");
+ }
+
+ private static void executeQuery() throws IoTDBConnectionException,
StatementExecutionException {
+ long startTime = System.currentTimeMillis();
+ try (SessionDataSetWrapper wrapper =
sessionPool.executeQueryStatement(SQL, 10 * 60 * 1000)) {
+ while (wrapper.hasNext()) {
+ RowRecord record = wrapper.next();
+ System.out.println(record);
+ }
+ }
+ long endTime = System.currentTimeMillis();
+
+ System.out.printf(
+ "Query series length: %d pattern length: %d cost: %fs%n",
+ SERIES_LENGTH * PATTERN_COUNT, PATTERN_LENGTH, (double) (endTime -
startTime) / 1000.0);
+ }
+
+ public static void main(String[] args)
+ throws StatementExecutionException, IoTDBConnectionException {
+
+ constructSessionPool();
+ registerSchema();
+ insertRecordByTablet();
+ executeQuery();
+ sessionPool.close();
+ }
+}