This is an automated email from the ASF dual-hosted git repository.
srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 179d394 Samza-2287: Removing RegEx based matching while parsing the
sql statement (#1121)
179d394 is described below
commit 179d3945918590298bfc4d5d1529a0c8dd316eb4
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Wed Jul 31 14:52:03 2019 -0700
Samza-2287: Removing RegEx based matching while parsing the sql statement
(#1121)
* Removing RegEx while parsing the sql statement
* Adding few more parser tests
* Adding verbose logging
---
.../apache/samza/sql/util/SamzaSqlQueryParser.java | 23 +++++-----
.../samza/sql/util/TestSamzaSqlQueryParser.java | 49 +++++++++++++++++++++-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 2 +-
3 files changed, 61 insertions(+), 13 deletions(-)
diff --git
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index e81abfa..d2ed991 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,12 +53,15 @@ import org.apache.calcite.tools.Planner;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.interfaces.SamzaSqlDriver;
import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility class that is used to parse the Samza sql query to figure out the
sources, sink etc..
*/
public class SamzaSqlQueryParser {
+ private static final Logger LOG =
LoggerFactory.getLogger(SamzaSqlQueryParser.class);
private SamzaSqlQueryParser() {
}
@@ -94,13 +97,6 @@ public class SamzaSqlQueryParser {
}
public static QueryInfo parseQuery(String sql) {
-
- Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select
.* from (.*))", Pattern.CASE_INSENSITIVE);
- Matcher m = insertIntoSqlPattern.matcher(sql);
- if (!m.matches()) {
- throw new SamzaException("Invalid query format");
- }
-
Planner planner = createPlanner();
SqlNode sqlNode;
try {
@@ -117,13 +113,20 @@ public class SamzaSqlQueryParser {
sink = sqlInsert.getTargetTable().toString();
if (sqlInsert.getSource() instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
- selectQuery = m.group(2);
+ selectQuery = sqlSelect.toString();
+ LOG.info("Parsed select query {} from sql {}", selectQuery, sql);
sources = getSourcesFromSelectQuery(sqlSelect);
} else {
- throw new SamzaException("Sql query is not of the expected format");
+ String msg = String.format("Sql query is not of the expected format.
Select node expected, found %s",
+ sqlInsert.getSource().getClass().toString());
+ LOG.error(msg);
+ throw new SamzaException(msg);
}
} else {
- throw new SamzaException("Sql query is not of the expected format");
+ String msg = String.format("Sql query is not of the expected format.
Insert node expected, found %s",
+ sqlNode.getClass().toString());
+ LOG.error(msg);
+ throw new SamzaException(msg);
}
return new QueryInfo(selectQuery, sources, sink, sql);
diff --git
a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
index f15c2f6..a01c32a 100644
---
a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
+++
b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
@@ -21,9 +21,9 @@ package org.apache.samza.sql.util;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Assert;
import org.junit.Test;
-import junit.framework.Assert;
public class TestSamzaSqlQueryParser {
@@ -31,12 +31,57 @@ public class TestSamzaSqlQueryParser {
public void testParseQuery() {
QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo
select * from tracking.bar");
Assert.assertEquals("log.foo", queryInfo.getSink());
- Assert.assertEquals(queryInfo.getSelectQuery(), "select * from
tracking.bar", queryInfo.getSelectQuery());
Assert.assertEquals(1, queryInfo.getSources().size());
Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
}
@Test
+ public void testParseGroupyByQuery() {
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo
select b.pageKey, count(*) from tracking.bar as b group by b.pageKey");
+ Assert.assertEquals("log.foo", queryInfo.getSink());
+ Assert.assertEquals(1, queryInfo.getSources().size());
+ Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+ }
+
+ @Test
+ public void testParseUnNestSubQuery() {
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo
SELECT * FROM unnest(SELECT int_array_field1 FROM tracking.bar) ");
+ Assert.assertEquals("log.foo", queryInfo.getSink());
+ Assert.assertEquals(1, queryInfo.getSources().size());
+ Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+ }
+
+ @Test
+ public void testParseJoinSubQuery() {
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from (SELECT * FROM testavro.PAGEVIEW pv1 where
pv1.field1='foo') as pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+ Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+ Assert.assertEquals(2, queryInfo.getSources().size());
+ Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+ Assert.assertEquals("testavro.PROFILE.$table",
queryInfo.getSources().get(1));
+ }
+
+ @Test
+ public void testParseJoinUnNestQuery() {
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic"
+ + " select p.name as profileName, pv.pageKey"
+ + " from unnest(SELECT int_array_field1 FROM testavro.PAGEVIEW) as
pv"
+ + " join testavro.PROFILE.`$table` as p"
+ + " on p.id = pv.profileId";
+ QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+ Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+ Assert.assertEquals(2, queryInfo.getSources().size());
+ Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+ Assert.assertEquals("testavro.PROFILE.$table",
queryInfo.getSources().get(1));
+ }
+
+ @Test
public void testParseJoinQuery() {
String sql =
"Insert into testavro.enrichedPageViewTopic"
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index aef7926..51f3d93 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -266,7 +266,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
- + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP)
+ MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
+ + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(),
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
runApplication(new MapConfig(staticConfigs));