This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c12f983 Samza-2254: Adding relevant calcite functions to Samza SQL
(#1084)
c12f983 is described below
commit c12f983f8acdf89befce2806ea31ebc3149c9ab2
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Fri Jun 21 08:24:21 2019 -0700
Samza-2254: Adding relevant calcite functions to Samza SQL (#1084)
* Adding bunch of relevant functions from calcite to Samza SQL
---
.../org/apache/samza/sql/planner/QueryPlanner.java | 7 +++-
.../samza/sql/planner/SamzaSqlOperatorTable.java | 46 ++++++++++++++++++++++
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 22 ++++++++++-
3 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 7114098..bbf1770 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -48,6 +48,9 @@ import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
@@ -141,10 +144,12 @@ public class QueryPlanner {
sqlOperatorTables.add(new SamzaSqlOperatorTable());
sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+ // Using lenient so that !=,%,- are allowed.
FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).build())
+
.parserConfig(SqlParser.configBuilder().setLex(Lex.JAVA).setConformance(SqlConformanceEnum.LENIENT).build())
.defaultSchema(rootSchema)
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
.traitDefs(traitDefs)
.context(Contexts.EMPTY_CONTEXT)
.costFactory(null)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index 5a4b780..766925a 100644
---
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -26,7 +26,14 @@ import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.fun.SqlArrayValueConstructor;
+import org.apache.calcite.sql.fun.SqlCoalesceFunction;
import org.apache.calcite.sql.fun.SqlDatePartFunction;
+import org.apache.calcite.sql.fun.SqlMapValueConstructor;
+import org.apache.calcite.sql.fun.SqlMultisetQueryConstructor;
+import org.apache.calcite.sql.fun.SqlMultisetValueConstructor;
+import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
@@ -59,28 +66,56 @@ public class SamzaSqlOperatorTable extends
ReflectiveSqlOperatorTable {
public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE;
public static final SqlPostfixOperator IS_NOT_FALSE =
SqlStdOperatorTable.IS_NOT_FALSE;
public static final SqlPostfixOperator IS_FALSE =
SqlStdOperatorTable.IS_FALSE;
+ public static final SqlPostfixOperator IS_EMPTY =
SqlStdOperatorTable.IS_EMPTY;
+ public static final SqlPostfixOperator IS_NOT_EMPTY =
SqlStdOperatorTable.IS_NOT_EMPTY;
public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
public static final SqlOperator DOT = SqlStdOperatorTable.DOT;
+ public static final SqlFunction ELEMENT = SqlStdOperatorTable.ELEMENT;
+ public static final SqlPrefixOperator EXISTS = SqlStdOperatorTable.EXISTS;
public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;
public static final SqlPrefixOperator UNARY_MINUS =
SqlStdOperatorTable.UNARY_MINUS;
public static final SqlPrefixOperator UNARY_PLUS =
SqlStdOperatorTable.UNARY_PLUS;
public static final SqlPrefixOperator EXPLICIT_TABLE =
SqlStdOperatorTable.EXPLICIT_TABLE;
+ public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE;
+ public static final SqlSpecialOperator NOT_LIKE =
SqlStdOperatorTable.NOT_LIKE;
+ public static final SqlSpecialOperator NOT_SIMILAR_TO =
SqlStdOperatorTable.NOT_SIMILAR_TO;
+ public static final SqlSpecialOperator SIMILAR_TO =
SqlStdOperatorTable.SIMILAR_TO;
+ public static final SqlFunction OVERLAY = SqlStdOperatorTable.OVERLAY;
+ public static final SqlFunction POSITION = SqlStdOperatorTable.POSITION;
+ public static final SqlFunction CHARACTER_LENGTH =
SqlStdOperatorTable.CHARACTER_LENGTH;
+ public static final SqlFunction INITCAP = SqlStdOperatorTable.INITCAP;
+ public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
+ public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
public static final SqlFunction CHAR_LENGTH =
SqlStdOperatorTable.CHAR_LENGTH;
public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING;
public static final SqlFunction REPLACE = SqlStdOperatorTable.REPLACE;
public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
+
+
+ public static final SqlFunction CARDINALITY =
SqlStdOperatorTable.CARDINALITY;
+
+ public static final SqlMultisetValueConstructor MULTISET_VALUE =
SqlStdOperatorTable.MULTISET_VALUE;
+ public static final SqlMultisetQueryConstructor MULTISET_QUERY =
SqlStdOperatorTable.MULTISET_QUERY;
+ public static final SqlMultisetQueryConstructor ARRAY_QUERY =
SqlStdOperatorTable.ARRAY_QUERY;
+ public static final SqlArrayValueConstructor ARRAY_VALUE_CONSTRUCTOR =
SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR;
+ public static final SqlMultisetQueryConstructor MAP_QUERY =
SqlStdOperatorTable.MAP_QUERY;
+ public static final SqlMapValueConstructor MAP_VALUE_CONSTRUCTOR =
SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR;
+ public static final SqlRowOperator ROW = SqlStdOperatorTable.ROW;
+
public static final SqlFunction POWER = SqlStdOperatorTable.POWER;
+ public static final SqlFunction ROUND = SqlStdOperatorTable.ROUND;
public static final SqlFunction SQRT = SqlStdOperatorTable.SQRT;
public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
public static final SqlFunction EXTRACT = SqlStdOperatorTable.EXTRACT;
+
public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
public static final SqlFunction LOCALTIMESTAMP =
SqlStdOperatorTable.LOCALTIMESTAMP;
public static final SqlFunction CURRENT_TIME =
SqlStdOperatorTable.CURRENT_TIME;
@@ -88,8 +123,19 @@ public class SamzaSqlOperatorTable extends
ReflectiveSqlOperatorTable {
public static final SqlFunction CURRENT_DATE =
SqlStdOperatorTable.CURRENT_DATE;
public static final SqlFunction TIMESTAMP_ADD =
SqlStdOperatorTable.TIMESTAMP_ADD;
public static final SqlFunction TIMESTAMP_DIFF =
SqlStdOperatorTable.TIMESTAMP_DIFF;
+
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+
+ public static final SqlDatePartFunction YEAR = SqlStdOperatorTable.YEAR;
public static final SqlDatePartFunction MONTH = SqlStdOperatorTable.MONTH;
+ public static final SqlDatePartFunction QUARTER =
SqlStdOperatorTable.QUARTER;
+ public static final SqlDatePartFunction WEEK = SqlStdOperatorTable.WEEK;
+ public static final SqlDatePartFunction DAYOFYEAR =
SqlStdOperatorTable.DAYOFYEAR;
+ public static final SqlDatePartFunction DAYOFMONTH =
SqlStdOperatorTable.DAYOFMONTH;
+ public static final SqlDatePartFunction DAYOFWEEK =
SqlStdOperatorTable.DAYOFWEEK;
+ public static final SqlDatePartFunction HOUR = SqlStdOperatorTable.HOUR;
+ public static final SqlDatePartFunction MINUTE = SqlStdOperatorTable.MINUTE;
+ public static final SqlDatePartFunction SECOND = SqlStdOperatorTable.SECOND;
public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 0c66d2f..6b95b27 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -338,7 +338,27 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7)
THEN 2 WHEN id < 5 THEN 0 ELSE NULL END as long_value, 'test' || 'foo' as
string_value from testavro.SIMPLE1";
+ + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7)
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL
END as string_value from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql1);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
+ runApplication(new MapConfig(staticConfigs));
+
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord)
x.getMessage()).get("id").toString()))
+ .sorted()
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ Assert.assertTrue(IntStream.range(0,
numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+ }
+
+ @Test
+ public void testEndToEndWithLike() throws Exception {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ + " select id, name as string_value from testavro.SIMPLE1 where name
like 'Name%'";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));