This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new d701495  [CALCITE-4176] Key descriptor can be optional in SESSION 
table function
d701495 is described below

commit d701495640df77992b43093422e9331de103d9fd
Author: davonliu <[email protected]>
AuthorDate: Tue Sep 22 20:08:53 2020 +0800

    [CALCITE-4176] Key descriptor can be optional in SESSION table function
    
    Fix style
    
    Fix style
    
    Adding tests to check time column for TUMBLE/HOP table function
    
    Fix style
---
 .../apache/calcite/sql/SqlHopTableFunction.java    |  3 +
 .../calcite/sql/SqlSessionTableFunction.java       | 31 +++++++---
 .../apache/calcite/sql/SqlTumbleTableFunction.java |  3 +
 .../apache/calcite/sql/SqlWindowTableFunction.java | 24 ++++++++
 .../org/apache/calcite/test/SqlValidatorTest.java  | 66 ++++++++++++++++++----
 5 files changed, 110 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java 
b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index d08dd2e..8c616b0 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -48,6 +48,9 @@ public class SqlHopTableFunction extends 
SqlWindowTableFunction {
       if (!checkTableAndDescriptorOperands(callBinding, 1)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
+      if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
+      }
       if (!checkIntervalOperands(callBinding, 2)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
diff --git 
a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java 
b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index f82560f..c031dea 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -29,7 +29,8 @@ import com.google.common.collect.ImmutableList;
  * <ol>
  *   <li>table as data source</li>
  *   <li>a descriptor to provide a watermarked column name from the input 
table</li>
- *   <li>a descriptor to provide a column as key, on which sessionization will 
be applied</li>
+ *   <li>a descriptor to provide a column as key, on which sessionization will 
be applied,
+ *   optional</li>
  *   <li>an interval parameter to specify a inactive activity gap to break 
sessions</li>
  * </ol>
  */
@@ -42,25 +43,41 @@ public class SqlSessionTableFunction extends 
SqlWindowTableFunction {
   private static class OperandMetadataImpl extends AbstractOperandMetadata {
     OperandMetadataImpl() {
       super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE),
-          4);
+          3);
     }
 
     @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
         boolean throwOnFailure) {
-      final SqlValidator validator = callBinding.getValidator();
-      if (!checkTableAndDescriptorOperands(callBinding, 2)) {
+      if (!checkTableAndDescriptorOperands(callBinding, 1)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
+      }
+      if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
-      final RelDataType type3 = 
validator.getValidatedNodeType(callBinding.operand(3));
-      if (!SqlTypeUtil.isInterval(type3)) {
+
+      final SqlValidator validator = callBinding.getValidator();
+      final SqlNode operand2 = callBinding.operand(2);
+      final RelDataType type2 = validator.getValidatedNodeType(operand2);
+      if (operand2.getKind() == SqlKind.DESCRIPTOR) {
+        final SqlNode operand0 = callBinding.operand(0);
+        final RelDataType type = validator.getValidatedNodeType(operand0);
+        validateColumnNames(
+            validator, type.getFieldNames(), ((SqlCall) 
operand2).getOperandList());
+      } else if (!SqlTypeUtil.isInterval(type2)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
+      if (callBinding.getOperandCount() > 3) {
+        final RelDataType type3 = 
validator.getValidatedNodeType(callBinding.operand(3));
+        if (!SqlTypeUtil.isInterval(type3)) {
+          return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
+        }
+      }
       return true;
     }
 
     @Override public String getAllowedSignatures(SqlOperator op, String 
opName) {
       return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
-          + "DESCRIPTOR(key), datetime interval)";
+          + "DESCRIPTOR(key) optional, datetime interval)";
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java 
b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
index 90d2193..25a8574 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -49,6 +49,9 @@ public class SqlTumbleTableFunction extends 
SqlWindowTableFunction {
       if (!checkTableAndDescriptorOperands(callBinding, 1)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
+      if (!checkTimeColumnDescriptorOperand(callBinding, 1)) {
+        return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
+      }
       if (!checkIntervalOperands(callBinding, 2)) {
         return throwValidationSignatureErrorOrReturnFalse(callBinding, 
throwOnFailure);
       }
diff --git 
a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java 
b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index 8af4b77..e9d4494 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -18,6 +18,7 @@ package org.apache.calcite.sql;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -189,6 +190,29 @@ public class SqlWindowTableFunction extends SqlFunction
     }
 
     /**
+     * Checks whether the type that the operand of time col descriptor refers 
to is valid.
+     *
+     * @param callBinding The call binding
+     * @param pos The position of the descriptor at the operands of the call
+     * @return true if validation passes, false otherwise
+     */
+    boolean checkTimeColumnDescriptorOperand(SqlCallBinding callBinding, int 
pos) {
+      SqlValidator validator = callBinding.getValidator();
+      SqlNode operand0 = callBinding.operand(0);
+      RelDataType type = validator.getValidatedNodeType(operand0);
+      List<SqlNode> operands = ((SqlCall) 
callBinding.operand(pos)).getOperandList();
+      SqlIdentifier identifier = (SqlIdentifier) operands.get(0);
+      String columnName = identifier.getSimple();
+      SqlNameMatcher matcher = validator.getCatalogReader().nameMatcher();
+      for (RelDataTypeField field : type.getFieldList()) {
+        if (matcher.matches(field.getName(), columnName)) {
+          return SqlTypeUtil.isTimestamp(field.getType());
+        }
+      }
+      return false;
+    }
+
+    /**
      * Checks whether the operands starting from position {@code startPos} are
      * all of type {@code INTERVAL}, returning whether successful.
      *
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index fc82a77..8b1fdb0 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10394,6 +10394,23 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
     // negative tests.
     sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
         + "from table(\n"
+        + "^tumble(table orders, descriptor(productid), interval '2' hour)^)")
+        .fails("Cannot apply 'TUMBLE' to arguments of type 
'TUMBLE\\(<RECORDTYPE\\"
+            + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER 
ORDERID\\)>, <COLUMN_LIST>, "
+            + "<INTERVAL HOUR>\\)'\\. Supported form\\(s\\): TUMBLE\\(TABLE 
table_name, "
+            + "DESCRIPTOR\\(col1, col2 \\.\\.\\.\\), datetime interval\\[, 
datetime interval\\]\\)");
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "^tumble(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(productid),\n"
+        + "size => interval '2' hour)^)")
+        .fails("Cannot apply 'TUMBLE' to arguments of type 
'TUMBLE\\(<RECORDTYPE\\"
+            + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER 
ORDERID\\)>, <COLUMN_LIST>, "
+            + "<INTERVAL HOUR>\\)'\\. Supported form\\(s\\): TUMBLE\\(TABLE 
table_name, "
+            + "DESCRIPTOR\\(col1, col2 \\.\\.\\.\\), datetime interval\\[, 
datetime interval\\]\\)");
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
         + "tumble(\n"
         + "^\"data\"^ => table orders,\n"
         + "TIMECOL => descriptor(rowtime),\n"
@@ -10455,6 +10472,26 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
         + "size => interval '1' hour,\n"
         + "\"OFFSET\" => interval '20' minute))").ok();
     // negative tests.
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "^hop(table orders, descriptor(productid), interval '2' hour, 
interval '1' hour)^)")
+        .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\"
+            + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER 
ORDERID\\)>, <COLUMN_LIST>, "
+            + "<INTERVAL HOUR>, <INTERVAL HOUR>\\)'\\. Supported form\\(s\\): "
+            + "HOP\\(TABLE table_name, DESCRIPTOR\\(timecol\\), "
+            + "datetime interval, datetime interval\\[, datetime 
interval\\]\\)");
+    sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+        + "from table(\n"
+        + "^hop(\n"
+        + "data => table orders,\n"
+        + "timecol => descriptor(productid),\n"
+        + "size => interval '2' hour,\n"
+        + "slide => interval '1' hour)^)")
+        .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\"
+            + "(TIMESTAMP\\(0\\) ROWTIME, INTEGER PRODUCTID, INTEGER 
ORDERID\\)>, <COLUMN_LIST>, "
+            + "<INTERVAL HOUR>, <INTERVAL HOUR>\\)'\\. Supported form\\(s\\): "
+            + "HOP\\(TABLE table_name, DESCRIPTOR\\(timecol\\), "
+            + "datetime interval, datetime interval\\[, datetime 
interval\\]\\)");
     sql("select * from table(\n"
         + "hop(\n"
         + "^\"data\"^ => table orders,\n"
@@ -10504,6 +10541,9 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
     sql("select * from table(\n"
         + "session(table orders, descriptor(rowtime), descriptor(productid), 
interval '1' hour))")
         .ok();
+    // test without key descriptor
+    sql("select * from table(\n"
+        + "session(table orders, descriptor(rowtime), interval '2' 
hour))").ok();
     // test named params.
     sql("select * from table(\n"
         + "session(\n"
@@ -10512,41 +10552,47 @@ public class SqlValidatorTest extends 
SqlValidatorTestCase {
         + "key => descriptor(productid),\n"
         + "size => interval '1' hour))")
         .ok();
-    // negative tests.
     sql("select * from table(\n"
         + "session(\n"
-        + "^\"data\"^ => table orders,\n"
+        + "data => table orders,\n"
         + "timecol => descriptor(rowtime),\n"
-        + "key => descriptor(productid),\n"
         + "size => interval '1' hour))")
-        .fails("Param 'data' not found in function 'SESSION'; did you mean 
'DATA'\\?");
+        .ok();
+    // negative tests.
     sql("select * from table(\n"
         + "^session(\n"
         + "data => table orders,\n"
         + "key => descriptor(productid),\n"
         + "size => interval '1' hour)^)")
-        .fails("Invalid number of arguments to function 'SESSION'. Was 
expecting 4 arguments");
+        .fails("Cannot apply 'SESSION' to arguments of type 
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+            + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, 
<COLUMN_LIST>, "
+            + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE 
table_name, DESCRIPTOR\\("
+            + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime 
interval\\)");
     sql("select * from table(\n"
-        + "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
-        .fails("Invalid number of arguments to function 'SESSION'. Was 
expecting 4 arguments");
+        + "session(\n"
+        + "^\"data\"^ => table orders,\n"
+        + "timecol => descriptor(rowtime),\n"
+        + "key => descriptor(productid),\n"
+        + "size => interval '1' hour))")
+        .fails("Param 'data' not found in function 'SESSION'; did you mean 
'DATA'\\?");
     sql("select * from table(\n"
         + "^session(table orders, descriptor(rowtime), descriptor(productid), 
'test')^)")
         .fails("Cannot apply 'SESSION' to arguments of type 
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, 
<COLUMN_LIST>, <COLUMN_LIST>, "
             + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE 
table_name, DESCRIPTOR\\("
-            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime 
interval\\)");
     sql("select * from table(\n"
         + "^session(table orders, descriptor(rowtime), 'test', interval '2' 
hour)^)")
         .fails("Cannot apply 'SESSION' to arguments of type 
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, 
<COLUMN_LIST>, <CHAR\\(4\\)>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE 
table_name, DESCRIPTOR\\("
-            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime 
interval\\)");
     sql("select * from table(\n"
         + "^session(table orders, 'test', descriptor(productid), interval '2' 
hour)^)")
         .fails("Cannot apply 'SESSION' to arguments of type 
'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
             + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, 
<CHAR\\(4\\)>, <COLUMN_LIST>, "
             + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE 
table_name, DESCRIPTOR\\("
-            + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
+            + "timecol\\), DESCRIPTOR\\(key\\) optional, datetime 
interval\\)");
     sql("select * from table(\n"
         + "session(TABLE ^tabler_not_exist^, descriptor(rowtime), 
descriptor(productid), interval '1' hour))")
         .fails("Object 'TABLER_NOT_EXIST' not found");

Reply via email to