This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c12f983  Samza-2254: Adding relevant calcite functions to Samza SQL 
(#1084)
c12f983 is described below

commit c12f983f8acdf89befce2806ea31ebc3149c9ab2
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Fri Jun 21 08:24:21 2019 -0700

    Samza-2254: Adding relevant calcite functions to Samza SQL (#1084)
    
    * Adding bunch of relevant functions from calcite to Samza SQL
---
 .../org/apache/samza/sql/planner/QueryPlanner.java |  7 +++-
 .../samza/sql/planner/SamzaSqlOperatorTable.java   | 46 ++++++++++++++++++++++
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 22 ++++++++++-
 3 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 7114098..bbf1770 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -48,6 +48,9 @@ import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
@@ -141,10 +144,12 @@ public class QueryPlanner {
       sqlOperatorTables.add(new SamzaSqlOperatorTable());
       sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
 
+      // Using lenient so that !=,%,- are allowed.
       FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
-          .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+          
.parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).setConformance(SqlConformanceEnum.LENIENT).build())
           .defaultSchema(rootSchema)
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+          .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
           .context(Contexts.EMPTY_CONTEXT)
           .costFactory(null)
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 5a4b780..766925a 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -26,7 +26,14 @@ import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlPostfixOperator;
 import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.fun.SqlArrayValueConstructor;
+import org.apache.calcite.sql.fun.SqlCoalesceFunction;
 import org.apache.calcite.sql.fun.SqlDatePartFunction;
+import org.apache.calcite.sql.fun.SqlMapValueConstructor;
+import org.apache.calcite.sql.fun.SqlMultisetQueryConstructor;
+import org.apache.calcite.sql.fun.SqlMultisetValueConstructor;
+import org.apache.calcite.sql.fun.SqlRowOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
 
@@ -59,28 +66,56 @@ public class SamzaSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
   public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE;
   public static final SqlPostfixOperator IS_NOT_FALSE = 
SqlStdOperatorTable.IS_NOT_FALSE;
   public static final SqlPostfixOperator IS_FALSE = 
SqlStdOperatorTable.IS_FALSE;
+  public static final SqlPostfixOperator IS_EMPTY = 
SqlStdOperatorTable.IS_EMPTY;
+  public static final SqlPostfixOperator IS_NOT_EMPTY = 
SqlStdOperatorTable.IS_NOT_EMPTY;
 
   public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
   public static final SqlOperator DOT = SqlStdOperatorTable.DOT;
+  public static final SqlFunction ELEMENT = SqlStdOperatorTable.ELEMENT;
 
+  public static final SqlPrefixOperator EXISTS = SqlStdOperatorTable.EXISTS;
   public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;
   public static final SqlPrefixOperator UNARY_MINUS = 
SqlStdOperatorTable.UNARY_MINUS;
   public static final SqlPrefixOperator UNARY_PLUS = 
SqlStdOperatorTable.UNARY_PLUS;
   public static final SqlPrefixOperator EXPLICIT_TABLE = 
SqlStdOperatorTable.EXPLICIT_TABLE;
 
+  public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE;
+  public static final SqlSpecialOperator NOT_LIKE = 
SqlStdOperatorTable.NOT_LIKE;
+  public static final SqlSpecialOperator NOT_SIMILAR_TO = 
SqlStdOperatorTable.NOT_SIMILAR_TO;
+  public static final SqlSpecialOperator SIMILAR_TO = 
SqlStdOperatorTable.SIMILAR_TO;
 
+  public static final SqlFunction OVERLAY = SqlStdOperatorTable.OVERLAY;
+  public static final SqlFunction POSITION = SqlStdOperatorTable.POSITION;
+  public static final SqlFunction CHARACTER_LENGTH = 
SqlStdOperatorTable.CHARACTER_LENGTH;
+  public static final SqlFunction INITCAP = SqlStdOperatorTable.INITCAP;
+  public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
+  public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
   public static final SqlFunction CHAR_LENGTH = 
SqlStdOperatorTable.CHAR_LENGTH;
   public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING;
   public static final SqlFunction REPLACE = SqlStdOperatorTable.REPLACE;
   public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
   public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
   public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
+
+
+  public static final SqlFunction CARDINALITY = 
SqlStdOperatorTable.CARDINALITY;
+
+  public static final SqlMultisetValueConstructor MULTISET_VALUE = 
SqlStdOperatorTable.MULTISET_VALUE;
+  public static final SqlMultisetQueryConstructor MULTISET_QUERY = 
SqlStdOperatorTable.MULTISET_QUERY;
+  public static final SqlMultisetQueryConstructor ARRAY_QUERY = 
SqlStdOperatorTable.ARRAY_QUERY;
+  public static final SqlArrayValueConstructor ARRAY_VALUE_CONSTRUCTOR = 
SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR;
+  public static final SqlMultisetQueryConstructor MAP_QUERY = 
SqlStdOperatorTable.MAP_QUERY;
+  public static final SqlMapValueConstructor MAP_VALUE_CONSTRUCTOR = 
SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR;
+  public static final SqlRowOperator ROW = SqlStdOperatorTable.ROW;
+
   public static final SqlFunction POWER = SqlStdOperatorTable.POWER;
+  public static final SqlFunction ROUND = SqlStdOperatorTable.ROUND;
   public static final SqlFunction SQRT = SqlStdOperatorTable.SQRT;
   public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
   public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
   public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
   public static final SqlFunction EXTRACT = SqlStdOperatorTable.EXTRACT;
+
   public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
   public static final SqlFunction LOCALTIMESTAMP = 
SqlStdOperatorTable.LOCALTIMESTAMP;
   public static final SqlFunction CURRENT_TIME = 
SqlStdOperatorTable.CURRENT_TIME;
@@ -88,8 +123,19 @@ public class SamzaSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
   public static final SqlFunction CURRENT_DATE = 
SqlStdOperatorTable.CURRENT_DATE;
   public static final SqlFunction TIMESTAMP_ADD = 
SqlStdOperatorTable.TIMESTAMP_ADD;
   public static final SqlFunction TIMESTAMP_DIFF = 
SqlStdOperatorTable.TIMESTAMP_DIFF;
+
   public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+
+  public static final SqlDatePartFunction YEAR = SqlStdOperatorTable.YEAR;
   public static final SqlDatePartFunction MONTH = SqlStdOperatorTable.MONTH;
+  public static final SqlDatePartFunction QUARTER = 
SqlStdOperatorTable.QUARTER;
+  public static final SqlDatePartFunction WEEK = SqlStdOperatorTable.WEEK;
+  public static final SqlDatePartFunction DAYOFYEAR = 
SqlStdOperatorTable.DAYOFYEAR;
+  public static final SqlDatePartFunction DAYOFMONTH = 
SqlStdOperatorTable.DAYOFMONTH;
+  public static final SqlDatePartFunction DAYOFWEEK = 
SqlStdOperatorTable.DAYOFWEEK;
+  public static final SqlDatePartFunction HOUR = SqlStdOperatorTable.HOUR;
+  public static final SqlDatePartFunction MINUTE = SqlStdOperatorTable.MINUTE;
+  public static final SqlDatePartFunction SECOND = SqlStdOperatorTable.SECOND;
 
   public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
   public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 0c66d2f..6b95b27 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -338,7 +338,27 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) 
THEN 2 WHEN id < 5 THEN 0 ELSE NULL END as long_value, 'test' || 'foo' as 
string_value from testavro.SIMPLE1";
+        + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) 
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL 
END as string_value from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) 
x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, 
numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
+  public void testEndToEndWithLike() throws Exception {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+        + " select id, name as string_value from testavro.SIMPLE1 where name 
like 'Name%'";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     runApplication(new MapConfig(staticConfigs));

Reply via email to