This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch xianyi in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4fd32d6a965b725bcd6708dddb0374f32aebf536 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Nov 25 22:08:13 2021 +0800 fix udf bug --- .../main/java/org/apache/iotdb/SessionExample.java | 34 ++++++++++++++++++---- .../db/query/udf/core/executor/UDTFExecutor.java | 1 + .../query/udf/core/layer/RawQueryInputLayer.java | 1 + ...nputColumnSingleReferenceIntermediateLayer.java | 4 +++ 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 2f60361..b463f9d 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -71,13 +71,16 @@ public class SessionExample { // createMultiTimeseries(); // insertRecord(); // long start = System.currentTimeMillis(); - // insertTablet(); + insertTablet(); // System.out.println(System.currentTimeMillis() - start); // insertTabletWithNullValues(); // insertTablets(); // insertRecords(); // selectInto(); // createAndDropContinuousQueries(); + + // session.executeNonQueryStatement("flush"); + // nonQuery(); // query(); // queryWithTimeout(); @@ -122,6 +125,26 @@ public class SessionExample { session.close(); } + private static void query() throws IoTDBConnectionException, StatementExecutionException { + // long s = System.currentTimeMillis(); + // SessionDataSet dataSet = session.executeQueryStatement("select s1, s2, s3 from + // root.sg1.d1"); + // while (dataSet.hasNext()) { + // dataSet.next(); + // } + // dataSet.close(); + // System.out.println(System.currentTimeMillis() - s); + + // long s = System.currentTimeMillis(); + // SessionDataSet dataSet = + // session.executeQueryStatement("select en(s1), en(s2), en(s3) from root.sg1.d1"); + // while (dataSet.hasNext()) { + // dataSet.next(); + // } + // dataSet.close(); + // System.out.println(System.currentTimeMillis() - s); + } + private static void createAndDropContinuousQueries() throws StatementExecutionException, IoTDBConnectionException { session.executeNonQueryStatement( @@ -391,20 +414,21 @@ public class SessionExample { // The schema of measurements of one device // only measurementId and data type in MeasurementSchema take effects in Tablet List<IMeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT64)); - schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.INT64)); + schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.DOUBLE)); + schemaList.add(new UnaryMeasurementSchema("s3", TSDataType.DOUBLE)); Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100); // Method 1 to add tablet data long timestamp = System.currentTimeMillis(); + // for (long row = 0; row < 2_5920_0000; row++) { for (long row = 0; row < 2_5920_0000; row++) { int rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, timestamp); for (int s = 0; s < 3; s++) { - long value = new Random().nextLong(); + double value = new Random().nextDouble(); tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); } if (tablet.rowSize == tablet.getMaxRowNumber()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java index 48b94f9..3b04bb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java @@ -87,6 +87,7 @@ public class UDTFExecutor { try { udtf.transform(rowWindow, collector); } catch (Exception e) { + e.printStackTrace(); onError("transform(RowWindow, PointCollector)", e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java index ee5d246..c648444 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java @@ -157,6 +157,7 @@ public class RawQueryInputLayer { cachedRowRecord = null; safetyPile.moveForwardTo(currentRowIndex + 1); + updateRowRecordListEvictionUpperBound(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java index 3dc3e34..be5d41c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java @@ -144,6 +144,8 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi @Override public void readyForNext() { hasCached = false; + + tvList.setEvictionUpperBound(beginIndex + 1); } @Override @@ -234,6 +236,8 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi public void readyForNext() { hasCached = false; nextWindowTimeBegin += slidingStep; + + tvList.setEvictionUpperBound(nextIndexBegin + 1); } @Override
