This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch add_max_min_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/add_max_min_by by this push:
new b2288869f63 add it
b2288869f63 is described below
commit b2288869f63a97afffc012e46850ee2055da6e49
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 00:37:35 2024 +0800
add it
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 12 ++
.../aggregation/TableMaxByAccumulator.java | 5 +
.../aggregation/TableMaxMinByBaseAccumulator.java | 184 ++++++++++++++++-----
.../aggregation/TableMinByAccumulator.java | 5 +
4 files changed, 161 insertions(+), 45 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index d7de73b7283..01d62a5c348 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1050,6 +1050,18 @@ public class IoTDBMultiIDsWithAttributesTableIT {
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
}
+ @Test
+ public void maxByMinByTest() {
+ String[] expectedHeader1 = buildHeaders(10);
+ sql =
+ "select
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
from table0";
+ retArray =
+ new String[] {
+
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
+ };
+ tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+ }
+
// ==================================================================
// ============================ Join Test ===========================
// ==================================================================
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
index b8124e32421..ef0ebbb2b79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxByAccumulator.java
@@ -56,6 +56,11 @@ public class TableMaxByAccumulator extends
TableMaxMinByBaseAccumulator {
return yValue.compareTo(yExtremeValue) > 0;
}
+ @Override
+ protected boolean check(boolean yValue, boolean yExtremeValue) {
+ return yValue;
+ }
+
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
index 1ce7cc6165b..2b883e3c0ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMaxMinByBaseAccumulator.java
@@ -54,7 +54,7 @@ public abstract class TableMaxMinByBaseAccumulator implements
TableAccumulator {
private boolean initResult;
- private long yTimeStamp = Long.MAX_VALUE;
+ // private long yTimeStamp = Long.MAX_VALUE;
private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data
type in MaxBy/MinBy: %s";
@@ -65,7 +65,7 @@ public abstract class TableMaxMinByBaseAccumulator implements
TableAccumulator {
this.yExtremeValue = TsPrimitiveType.getByType(yDataType);
}
- // Column should be like: | Time | x | y |
+ // Column should be like: | x | y |
@Override
public void addInput(Column[] arguments) {
checkArgument(arguments.length == 2, "Length of input Column[] for
MaxBy/MinBy should be 2");
@@ -75,21 +75,23 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
addIntInput(arguments);
return;
case INT64:
- // case TIMESTAMP:
- // addLongInput(arguments);
- // return;
- // case FLOAT:
- // addFloatInput(arguments);
- // return;
- // case DOUBLE:
- // addDoubleInput(arguments);
- // return;
- // case STRING:
- // addBinaryInput(arguments);
- // return;
+ case TIMESTAMP:
+ addLongInput(arguments);
+ return;
+ case FLOAT:
+ addFloatInput(arguments);
+ return;
+ case DOUBLE:
+ addDoubleInput(arguments);
+ return;
+ case STRING:
case TEXT:
case BLOB:
+ addBinaryInput(arguments);
+ return;
case BOOLEAN:
+ addBooleanInput(arguments);
+ return;
default:
throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE,
yDataType));
}
@@ -144,7 +146,7 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
xNull = true;
this.xResult.reset();
this.yExtremeValue.reset();
- yTimeStamp = Long.MAX_VALUE;
+ // yTimeStamp = Long.MAX_VALUE;
}
@Override
@@ -169,6 +171,91 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
}
}
+ private void addLongInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateLongResult(long yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getLong())) {
+ initResult = true;
+ yExtremeValue.setLong(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addFloatInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateFloatResult(float yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getFloat())) {
+ initResult = true;
+ yExtremeValue.setFloat(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addDoubleInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateDoubleResult(double yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getDouble())) {
+ initResult = true;
+ yExtremeValue.setDouble(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addBinaryInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateBinaryResult(column[1].getBinary(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateBinaryResult(Binary yValue, Column xColumn, int xIndex) {
+ if (!initResult || check(yValue, yExtremeValue.getBinary())) {
+ initResult = true;
+ yExtremeValue.setBinary(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
+ private void addBooleanInput(Column[] column) {
+ int count = column[1].getPositionCount();
+ for (int i = 0; i < count; i++) {
+ if (!column[1].isNull(i)) {
+ updateBooleanResult(column[1].getBoolean(i), column[0], i);
+ }
+ }
+ }
+
+ private void updateBooleanResult(boolean yValue, Column xColumn, int xIndex)
{
+ if (!initResult || check(yValue, yExtremeValue.getBoolean())) {
+ initResult = true;
+ yExtremeValue.setBoolean(yValue);
+ updateX(xColumn, xIndex);
+ }
+ }
+
private void writeX(ColumnBuilder columnBuilder) {
if (xNull) {
columnBuilder.appendNull();
@@ -241,7 +328,7 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
try {
- dataOutputStream.writeLong(yTimeStamp);
+ // dataOutputStream.writeLong(yTimeStamp);
writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream);
dataOutputStream.writeBoolean(xNull);
if (!xNull) {
@@ -288,8 +375,8 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
}
private void updateFromBytesIntermediateInput(byte[] bytes) {
- long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
- int offset = Long.BYTES;
+ // long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
+ int offset = 0;
// Use Column to store x value
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(xDataType));
ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
@@ -301,36 +388,41 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
updateIntResult(intMaxVal, columnBuilder.build(), 0);
break;
- // case INT64:
- // case TIMESTAMP:
- // long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes,
Long.BYTES, offset);
- // offset += Long.BYTES;
- // readXFromBytesIntermediateInput(bytes, offset,
columnBuilder);
- // updateLongResult(time, longMaxVal, columnBuilder.build(), 0);
- // break;
- // case FLOAT:
- // float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
- // offset += Float.BYTES;
- // readXFromBytesIntermediateInput(bytes, offset,
columnBuilder);
- // updateFloatResult(time, floatMaxVal, columnBuilder.build(),
0);
- // break;
- // case DOUBLE:
- // double doubleMaxVal = BytesUtils.bytesToDouble(bytes,
offset);
- // offset += Long.BYTES;
- // readXFromBytesIntermediateInput(bytes, offset,
columnBuilder);
- // updateDoubleResult(time, doubleMaxVal,
columnBuilder.build(), 0);
- // break;
- // case STRING:
- // int length = BytesUtils.bytesToInt(bytes, offset);
- // offset += Integer.BYTES;
- // Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes,
offset, length));
- // offset += length;
- // readXFromBytesIntermediateInput(bytes, offset,
columnBuilder);
- // updateBinaryResult(time, binaryMaxVal,
columnBuilder.build(), 0);
- // break;
+ case INT64:
+ case TIMESTAMP:
+ long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES,
offset);
+ offset += Long.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateLongResult(longMaxVal, columnBuilder.build(), 0);
+ break;
+ case FLOAT:
+ float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
+ offset += Float.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateFloatResult(floatMaxVal, columnBuilder.build(), 0);
+ break;
+ case DOUBLE:
+ double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
+ offset += Long.BYTES;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateDoubleResult(doubleMaxVal, columnBuilder.build(), 0);
+ break;
+ case STRING:
case TEXT:
case BLOB:
+ int length = BytesUtils.bytesToInt(bytes, offset);
+ offset += Integer.BYTES;
+ Binary binaryMaxVal = new Binary(BytesUtils.subBytes(bytes, offset,
length));
+ offset += length;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateBinaryResult(binaryMaxVal, columnBuilder.build(), 0);
+ break;
case BOOLEAN:
+ boolean booleanMaxVal = BytesUtils.bytesToBool(bytes, offset);
+ offset += 1;
+ readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
+ updateBooleanResult(booleanMaxVal, columnBuilder.build(), 0);
+ break;
default:
throw new
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE,
yDataType));
}
@@ -389,4 +481,6 @@ public abstract class TableMaxMinByBaseAccumulator
implements TableAccumulator {
protected abstract boolean check(double yValue, double yExtremeValue);
protected abstract boolean check(Binary yValue, Binary yExtremeValue);
+
+ protected abstract boolean check(boolean yValue, boolean yExtremeValue);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
index 3b6d32195c8..d4fbadae512 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableMinByAccumulator.java
@@ -56,6 +56,11 @@ public class TableMinByAccumulator extends
TableMaxMinByBaseAccumulator {
return yValue.compareTo(yExtremeValue) < 0;
}
+ @Override
+ protected boolean check(boolean yValue, boolean yExtremeValue) {
+ return !yValue;
+ }
+
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE;