This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new d701495 [CALCITE-4176] Key descriptor can be optional in SESSION
table function
d701495 is described below
commit d701495640df77992b43093422e9331de103d9fd
Author: davonliu <[email protected]>
AuthorDate: Tue Sep 22 20:08:53 2020 +0800
[CALCITE-4176] Key descriptor can be optional in SESSION table function
Fix style
Fix style
Adding tests to check time column for TUMBLE/HOP table function
Fix style
---
.../apache/calcite/sql/SqlHopTableFunction.java | 3 +
.../calcite/sql/SqlSessionTableFunction.java | 31 +++++++---
.../apache/calcite/sql/SqlTumbleTableFunction.java | 3 +
.../apache/calcite/sql/SqlWindowTableFunction.java | 24 ++++++++
.../org/apache/calcite/test/SqlValidatorTest.java | 66 ++++++++++++++++++----
5 files changed, 110 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index d08dd2e..8c616b0 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -48,6 +48,9 @@ public class SqlHopTableFunction extends
SqlWindowTableFunction {
if (!checkTableAndDescriptorOperands(callBinding, 1)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
+ if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
+ }
if (!checkIntervalOperands(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
diff --git
a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index f82560f..c031dea 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -29,7 +29,8 @@ import com.google.common.collect.ImmutableList;
* <ol>
* <li>table as data source</li>
* <li>a descriptor to provide a watermarked column name from the input
table</li>
- * <li>a descriptor to provide a column as key, on which sessionization will
be applied</li>
+ * <li>a descriptor to provide a column as key, on which sessionization will
be applied,
+ * optional</li>
* <li>an interval parameter to specify a inactive activity gap to break
sessions</li>
* </ol>
*/
@@ -42,25 +43,41 @@ public class SqlSessionTableFunction extends
SqlWindowTableFunction {
private static class OperandMetadataImpl extends AbstractOperandMetadata {
OperandMetadataImpl() {
super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE),
- 4);
+ 3);
}
@Override public boolean checkOperandTypes(SqlCallBinding callBinding,
boolean throwOnFailure) {
- final SqlValidator validator = callBinding.getValidator();
- if (!checkTableAndDescriptorOperands(callBinding, 2)) {
+ if (!checkTableAndDescriptorOperands(callBinding, 1)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
+ }
+ if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
- final RelDataType type3 =
validator.getValidatedNodeType(callBinding.operand(3));
- if (!SqlTypeUtil.isInterval(type3)) {
+
+ final SqlValidator validator = callBinding.getValidator();
+ final SqlNode operand2 = callBinding.operand(2);
+ final RelDataType type2 = validator.getValidatedNodeType(operand2);
+ if (operand2.getKind() == SqlKind.DESCRIPTOR) {
+ final SqlNode operand0 = callBinding.operand(0);
+ final RelDataType type = validator.getValidatedNodeType(operand0);
+ validateColumnNames(
+ validator, type.getFieldNames(), ((SqlCall)
operand2).getOperandList());
+ } else if (!SqlTypeUtil.isInterval(type2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
+ if (callBinding.getOperandCount() > 3) {
+ final RelDataType type3 =
validator.getValidatedNodeType(callBinding.operand(3));
+ if (!SqlTypeUtil.isInterval(type3)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
+ }
+ }
return true;
}
@Override public String getAllowedSignatures(SqlOperator op, String
opName) {
return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
- + "DESCRIPTOR(key), datetime interval)";
+ + "DESCRIPTOR(key) optional, datetime interval)";
}
}
}
diff --git
a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
index 90d2193..25a8574 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -49,6 +49,9 @@ public class SqlTumbleTableFunction extends
SqlWindowTableFunction {
if (!checkTableAndDescriptorOperands(callBinding, 1)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
+ if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
+ }
if (!checkIntervalOperands(callBinding, 2)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
}
diff --git
a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index 8af4b77..e9d4494 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -18,6 +18,7 @@ package org.apache.calcite.sql;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -189,6 +190,29 @@ public class SqlWindowTableFunction extends SqlFunction
}
/**
+ * Checks whether the type that the operand of time col descriptor refers
to is valid.
+ *
+ * @param callBinding The call binding
+ * @param pos The position of the descriptor at the operands of the call
+ * @return true if validation passes, false otherwise
+ */
+ boolean checkTimeColumnDescriptorOperand(SqlCallBinding callBinding, int
pos) {
+ SqlValidator validator = callBinding.getValidator();
+ SqlNode operand0 = callBinding.operand(0);
+ RelDataType type = validator.getValidatedNodeType(operand0);
+ List<SqlNode> operands = ((SqlCall)
callBinding.operand(pos)).getOperandList();
+ SqlIdentifier identifier = (SqlIdentifier) operands.get(0);
+ String columnName = identifier.getSimple();
+ SqlNameMatcher matcher = validator.getCatalogReader().nameMatcher();
+ for (RelDataTypeField field : type.getFieldList()) {
+ if (matcher.matches(field.getName(), columnName)) {
+ return SqlTypeUtil.isTimestamp(field.getType());
+ }
+ }
+ return false;
+ }
+
+ /**
* Checks whether the operands starting from position {@code startPos} are
* all of type {@code INTERVAL}, returning whether successful.
*
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index fc82a77..8b1fdb0 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10394,6 +10394,23 @@ public class SqlValidatorTest extends
SqlValidatorTestCase {
// negative tests.
sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ "from table(\n"
+ + "^tumble(table orders, descriptor(productid), interval '2' hour)^)")
+ .fails("Cannot apply 'TUMBLE' to arguments of type
'TUMBLE\\(<RECORDTYPE\\"
+ + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER
ORDERID\\)>, <COLUMN_LIST>, "
+ + "<INTERVAL HOUR>\\)'\\. Supported form\\(s\\): TUMBLE\\(TABLE
table_name, "
+ + "DESCRIPTOR\\(col1, col2 \\.\\.\\.\\), datetime interval\\[,
datetime interval\\]\\)");
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "^tumble(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(productid),\n"
+ + "size => interval '2' hour)^)")
+ .fails("Cannot apply 'TUMBLE' to arguments of type
'TUMBLE\\(<RECORDTYPE\\"
+ + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER
ORDERID\\)>, <COLUMN_LIST>, "
+ + "<INTERVAL HOUR>\\)'\\. Supported form\\(s\\): TUMBLE\\(TABLE
table_name, "
+ + "DESCRIPTOR\\(col1, col2 \\.\\.\\.\\), datetime interval\\[,
datetime interval\\]\\)");
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ "tumble(\n"
+ "^\"data\"^ => table orders,\n"
+ "TIMECOL => descriptor(rowtime),\n"
@@ -10455,6 +10472,26 @@ public class SqlValidatorTest extends
SqlValidatorTestCase {
+ "size => interval '1' hour,\n"
+ "\"OFFSET\" => interval '20' minute))").ok();
// negative tests.
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "^hop(table orders, descriptor(productid), interval '2' hour,
interval '1' hour)^)")
+ .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\"
+ + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER
ORDERID\\)>, <COLUMN_LIST>, "
+ + "<INTERVAL HOUR>, <INTERVAL HOUR>\\)'\\. Supported form\\(s\\): "
+ + "HOP\\(TABLE table_name, DESCRIPTOR\\(timecol\\), "
+ + "datetime interval, datetime interval\\[, datetime
interval\\]\\)");
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "^hop(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(productid),\n"
+ + "size => interval '2' hour,\n"
+ + "slide => interval '1' hour)^)")
+ .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\"
+ + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER
ORDERID\\)>, <COLUMN_LIST>, "
+ + "<INTERVAL HOUR>, <INTERVAL HOUR>\\)'\\. Supported form\\(s\\): "
+ + "HOP\\(TABLE table_name, DESCRIPTOR\\(timecol\\), "
+ + "datetime interval, datetime interval\\[, datetime
interval\\]\\)");
sql("select * from table(\n"
+ "hop(\n"
+ "^\"data\"^ => table orders,\n"
@@ -10504,6 +10541,9 @@ public class SqlValidatorTest extends
SqlValidatorTestCase {
sql("select * from table(\n"
+ "session(table orders, descriptor(rowtime), descriptor(productid),
interval '1' hour))")
.ok();
+ // test without key descriptor
+ sql("select * from table(\n"
+ + "session(table orders, descriptor(rowtime), interval '2'
hour))").ok();
// test named params.
sql("select * from table(\n"
+ "session(\n"
@@ -10512,41 +10552,47 @@ public class SqlValidatorTest extends
SqlValidatorTestCase {
+ "key => descriptor(productid),\n"
+ "size => interval '1' hour))")
.ok();
- // negative tests.
sql("select * from table(\n"
+ "session(\n"
- + "^\"data\"^ => table orders,\n"
+ + "data => table orders,\n"
+ "timecol => descriptor(rowtime),\n"
- + "key => descriptor(productid),\n"
+ "size => interval '1' hour))")
- .fails("Param 'data' not found in function 'SESSION'; did you mean
'DATA'\\?");
+ .ok();
+ // negative tests.
sql("select * from table(\n"
+ "^session(\n"
+ "data => table orders,\n"
+ "key => descriptor(productid),\n"
+ "size => interval '1' hour)^)")
- .fails("Invalid number of arguments to function 'SESSION'. Was
expecting 4 arguments");
+ .fails("Cannot apply 'SESSION' to arguments of type
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>,
<COLUMN_LIST>, "
+ + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE
table_name, DESCRIPTOR\\("
+ + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime
interval\\)");
sql("select * from table(\n"
- + "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
- .fails("Invalid number of arguments to function 'SESSION'. Was
expecting 4 arguments");
+ + "session(\n"
+ + "^\"data\"^ => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "key => descriptor(productid),\n"
+ + "size => interval '1' hour))")
+ .fails("Param 'data' not found in function 'SESSION'; did you mean
'DATA'\\?");
sql("select * from table(\n"
+ "^session(table orders, descriptor(rowtime), descriptor(productid),
'test')^)")
.fails("Cannot apply 'SESSION' to arguments of type
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>,
<COLUMN_LIST>, <COLUMN_LIST>, "
+ "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE
table_name, DESCRIPTOR\\("
- + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime
interval\\)");
sql("select * from table(\n"
+ "^session(table orders, descriptor(rowtime), 'test', interval '2'
hour)^)")
.fails("Cannot apply 'SESSION' to arguments of type
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>,
<COLUMN_LIST>, <CHAR\\(4\\)>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE
table_name, DESCRIPTOR\\("
- + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime
interval\\)");
sql("select * from table(\n"
+ "^session(table orders, 'test', descriptor(productid), interval '2'
hour)^)")
.fails("Cannot apply 'SESSION' to arguments of type
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>,
<CHAR\\(4\\)>, <COLUMN_LIST>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE
table_name, DESCRIPTOR\\("
- + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime
interval\\)");
sql("select * from table(\n"
+ "session(TABLE ^tabler_not_exist^, descriptor(rowtime),
descriptor(productid), interval '1' hour))")
.fails("Object 'TABLER_NOT_EXIST' not found");