This is an automated email from the ASF dual-hosted git repository.

srinivasulu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 179d394  Samza-2287: Removing RegEx based matching while parsing the 
sql statement (#1121)
179d394 is described below

commit 179d3945918590298bfc4d5d1529a0c8dd316eb4
Author: Srinivasulu Punuru <[email protected]>
AuthorDate: Wed Jul 31 14:52:03 2019 -0700

    Samza-2287: Removing RegEx based matching while parsing the sql statement 
(#1121)
    
    * Removing RegEx while parsing the sql statement
    
    * Adding few more parser tests
    
    * Adding verbose logging
---
 .../apache/samza/sql/util/SamzaSqlQueryParser.java | 23 +++++-----
 .../samza/sql/util/TestSamzaSqlQueryParser.java    | 49 +++++++++++++++++++++-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  |  2 +-
 3 files changed, 61 insertions(+), 13 deletions(-)

diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java 
b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index e81abfa..d2ed991 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -53,12 +53,15 @@ import org.apache.calcite.tools.Planner;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.interfaces.SamzaSqlDriver;
 import org.apache.samza.sql.interfaces.SamzaSqlJavaTypeFactoryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Utility class that is used to parse the Samza sql query to figure out the 
sources, sink etc..
  */
 public class SamzaSqlQueryParser {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaSqlQueryParser.class);
 
   private SamzaSqlQueryParser() {
   }
@@ -94,13 +97,6 @@ public class SamzaSqlQueryParser {
   }
 
   public static QueryInfo parseQuery(String sql) {
-
-    Pattern insertIntoSqlPattern = Pattern.compile("insert into (.*) (select 
.* from (.*))", Pattern.CASE_INSENSITIVE);
-    Matcher m = insertIntoSqlPattern.matcher(sql);
-    if (!m.matches()) {
-      throw new SamzaException("Invalid query format");
-    }
-
     Planner planner = createPlanner();
     SqlNode sqlNode;
     try {
@@ -117,13 +113,20 @@ public class SamzaSqlQueryParser {
       sink = sqlInsert.getTargetTable().toString();
       if (sqlInsert.getSource() instanceof SqlSelect) {
         SqlSelect sqlSelect = (SqlSelect) sqlInsert.getSource();
-        selectQuery = m.group(2);
+        selectQuery = sqlSelect.toString();
+        LOG.info("Parsed select query {} from sql {}", selectQuery, sql);
         sources = getSourcesFromSelectQuery(sqlSelect);
       } else {
-        throw new SamzaException("Sql query is not of the expected format");
+        String msg = String.format("Sql query is not of the expected format. 
Select node expected, found %s",
+            sqlInsert.getSource().getClass().toString());
+        LOG.error(msg);
+        throw new SamzaException(msg);
       }
     } else {
-      throw new SamzaException("Sql query is not of the expected format");
+      String msg = String.format("Sql query is not of the expected format. 
Insert node expected, found %s",
+          sqlNode.getClass().toString());
+      LOG.error(msg);
+      throw new SamzaException(msg);
     }
 
     return new QueryInfo(selectQuery, sources, sink, sql);
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
index f15c2f6..a01c32a 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/util/TestSamzaSqlQueryParser.java
@@ -21,9 +21,9 @@ package org.apache.samza.sql.util;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.util.SamzaSqlQueryParser.QueryInfo;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
 
 public class TestSamzaSqlQueryParser {
 
@@ -31,12 +31,57 @@ public class TestSamzaSqlQueryParser {
   public void testParseQuery() {
     QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo 
select * from tracking.bar");
     Assert.assertEquals("log.foo", queryInfo.getSink());
-    Assert.assertEquals(queryInfo.getSelectQuery(), "select * from 
tracking.bar", queryInfo.getSelectQuery());
     Assert.assertEquals(1, queryInfo.getSources().size());
     Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
   }
 
   @Test
+  public void testParseGroupyByQuery() {
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo 
select b.pageKey, count(*) from tracking.bar as b group by b.pageKey");
+    Assert.assertEquals("log.foo", queryInfo.getSink());
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+  }
+
+  @Test
+  public void testParseUnNestSubQuery() {
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery("insert into log.foo 
SELECT * FROM unnest(SELECT int_array_field1 FROM tracking.bar) ");
+    Assert.assertEquals("log.foo", queryInfo.getSink());
+    Assert.assertEquals(1, queryInfo.getSources().size());
+    Assert.assertEquals("tracking.bar", queryInfo.getSources().get(0));
+  }
+
+  @Test
+  public void testParseJoinSubQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from (SELECT * FROM testavro.PAGEVIEW pv1 where 
pv1.field1='foo') as pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", 
queryInfo.getSources().get(1));
+  }
+
+  @Test
+  public void testParseJoinUnNestQuery() {
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic"
+            + " select p.name as profileName, pv.pageKey"
+            + " from unnest(SELECT int_array_field1 FROM testavro.PAGEVIEW) as 
pv"
+            + " join testavro.PROFILE.`$table` as p"
+            + " on p.id = pv.profileId";
+    QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql);
+    Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getSink());
+    Assert.assertEquals(2, queryInfo.getSources().size());
+    Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getSources().get(0));
+    Assert.assertEquals("testavro.PROFILE.$table", 
queryInfo.getSources().get(1));
+  }
+
+  @Test
   public void testParseJoinQuery() {
     String sql =
         "Insert into testavro.enrichedPageViewTopic"
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index aef7926..51f3d93 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -266,7 +266,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
-        + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) 
+ MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1";
+        + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), 
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     runApplication(new MapConfig(staticConfigs));

Reply via email to