[ 
https://issues.apache.org/jira/browse/BEAM-4044?focusedWorklogId=100399&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100399
 ]

ASF GitHub Bot logged work on BEAM-4044:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/18 04:00
            Start Date: 10/May/18 04:00
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5325: [BEAM-4044] 
[SQL] Just One Parser
URL: https://github.com/apache/beam/pull/5325
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index ce592cfbabb..d7ec10d3e43 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -20,7 +20,6 @@
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.impl.parser.BeamSqlParser;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -28,6 +27,9 @@
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
 
 /**
  * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive 
client.
@@ -54,7 +56,8 @@ public MetaStore getMetaStore() {
   /**
    * Returns a human readable representation of the query execution plan.
    */
-  public String explainQuery(String sqlString) throws Exception {
+  public String explainQuery(String sqlString)
+      throws ValidationException, RelConversionException, SqlParseException {
     BeamRelNode exeTree = env.getPlanner().convertToBeamRel(sqlString);
     String beamPlan = RelOptUtil.toString(exeTree);
     return beamPlan;
@@ -63,9 +66,9 @@ public String explainQuery(String sqlString) throws Exception 
{
   /**
    * Executes the given sql.
    */
-  public void execute(String sqlString) throws Exception {
-    BeamSqlParser parser = new BeamSqlParser(sqlString);
-    SqlNode sqlNode = parser.impl().parseSqlStmtEof();
+  public void execute(String sqlString)
+      throws ValidationException, RelConversionException, SqlParseException {
+    SqlNode sqlNode = env.getPlanner().parse(sqlString);
 
     // DDL nodes are SqlExecutableStatement
     if (sqlNode instanceof SqlExecutableStatement) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
index 45b649e466e..2757f172886 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryTransform.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.sdk.extensions.sql;
 
-
-import static 
org.apache.beam.sdk.extensions.sql.QueryValidationHelper.validateQuery;
 import static org.apache.beam.sdk.extensions.sql.SchemaHelper.toRows;
 
 import com.google.auto.value.AutoValue;
@@ -35,11 +33,13 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
 
 /**
  * A {@link PTransform} representing an execution plan for a SQL query.
@@ -59,20 +59,13 @@
   public PCollection<Row> expand(PInput input) {
     BeamSqlEnv sqlEnv = new BeamSqlEnv(toTableProvider(input));
 
-    if (input instanceof PCollection) {
-      validateQuery(sqlEnv, queryString());
-    }
-
     registerFunctions(sqlEnv);
 
     try {
-      return
-          PCollectionTuple.empty(input.getPipeline()).apply(
-          sqlEnv
-              .getPlanner()
-              .convertToBeamRel(queryString())
-              .toPTransform());
-    } catch (Exception e) {
+      return sqlEnv.getPlanner().compileBeamPipeline(
+          queryString(),
+          input.getPipeline());
+    } catch (ValidationException | RelConversionException | SqlParseException 
e) {
       throw new IllegalStateException(e);
     }
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryValidationHelper.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryValidationHelper.java
deleted file mode 100644
index ed893daa412..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/QueryValidationHelper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql;
-
-import static 
org.apache.beam.sdk.extensions.sql.QueryTransform.PCOLLECTION_NAME;
-
-import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.parser.SqlParseException;
-
-/**
- * QueryValidationHelper.
- */
-class QueryValidationHelper {
-
-  static void validateQuery(BeamSqlEnv sqlEnv, String queryString) {
-    SqlNode sqlNode;
-
-    try {
-      sqlNode = sqlEnv.getPlanner().parseQuery(queryString);
-      sqlEnv.getPlanner().getPlanner().close();
-    } catch (SqlParseException e) {
-      throw new IllegalStateException(e);
-    }
-
-    if (!(sqlNode instanceof SqlSelect)) {
-      throw new UnsupportedOperationException(
-          "Sql operation " + sqlNode.toString() + " is not supported");
-    }
-
-    if (!PCOLLECTION_NAME.equalsIgnoreCase(((SqlSelect) 
sqlNode).getFrom().toString())) {
-      throw new IllegalStateException("Use " + PCOLLECTION_NAME + " as table 
name"
-                                          + " when selecting from single 
PCollection."
-                                          + " Use PCollectionTuple to 
explicitly "
-                                          + "name the input PCollections");
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 3078305e450..30a6a7e8ba6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -46,7 +46,7 @@
  * of the SQL examples. Please consult Beam documentation on how to run 
pipelines.
  */
 class BeamSqlExample {
-  public static void main(String[] args) throws Exception {
+  public static void main(String[] args) {
     PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
     Pipeline p = Pipeline.create(options);
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java
deleted file mode 100644
index bfa4d47c63d..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParser.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.parser;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.StringReader;
-import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
-import org.apache.calcite.config.Lex;
-
-/**
- * SQL Parser which handles DDL for Beam.
- */
-public class BeamSqlParser {
-  public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
-  private final BeamSqlParserImpl impl;
-
-  public BeamSqlParser(String s) {
-    this.impl = new BeamSqlParserImpl(new StringReader(s));
-    this.impl.setTabSize(1);
-    this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
-    this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
-    this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
-    /*
-     *  By default parser uses [ ] for quoting identifiers. Switching to
-     *  DQID (double quoted identifiers) is needed for array and map access
-     *  (m['x'] = 1 or arr[2] = 10 etc) to work.
-     */
-    this.impl.switchTo("DQID");
-  }
-
-  @VisibleForTesting
-  public BeamSqlParserImpl impl() {
-    return impl;
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index 799224b48c7..e9ea24a06ea 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -162,8 +162,8 @@ Table toTable() {
     return
         Table
             .builder()
-            .type(getString(type).toLowerCase())
-            .name(name.getSimple().toLowerCase())
+            .type(getString(type))
+            .name(name.getSimple())
             .schema(columnList.stream().collect(toSchema()))
             .comment(getString(comment))
             .location(getString(location))
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
index 857da159a7e..95fcb3e9e6c 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropObject.java
@@ -70,7 +70,7 @@ public void execute(CalcitePrepare.Context context) {
     case DROP_TABLE:
       if (schema.schema instanceof BeamCalciteSchema) {
         BeamCalciteSchema beamSchema = (BeamCalciteSchema) schema.schema;
-        
beamSchema.getTableProvider().dropTable(name.getSimple().toLowerCase());
+        beamSchema.getTableProvider().dropTable(name.getSimple());
         existed = true;
       } else {
         existed = schema.removeTable(name.getSimple());
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index 1685d109b06..4e6d6c98b12 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -64,7 +64,7 @@
 public class BeamQueryPlanner {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamQueryPlanner.class);
 
-  protected final Planner planner;
+  private final FrameworkConfig config;
 
   public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
@@ -89,10 +89,10 @@ public BeamQueryPlanner(SchemaPlus schema) {
         new CalciteCatalogReader(
             CalciteSchema.from(schema).root(), Collections.emptyList(), 
TYPE_FACTORY, null));
 
-    FrameworkConfig config =
+    config =
         Frameworks.newConfigBuilder()
             .parserConfig(SqlParser.configBuilder()
-                .setLex(Lex.MYSQL)
+                .setLex(Lex.JAVA)
                 .setParserFactory(BeamSqlParserImpl.FACTORY)
                 .build())
             .defaultSchema(schema)
@@ -103,14 +103,20 @@ public BeamQueryPlanner(SchemaPlus schema) {
             .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
             .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
             .build();
-    this.planner = Frameworks.getPlanner(config);
   }
 
   /**
    * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
    */
-  public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
-    return planner.parse(sqlQuery);
+  public SqlNode parse(String sqlStatement) throws SqlParseException {
+    Planner planner = getPlanner();
+    SqlNode parsed;
+    try {
+      parsed = planner.parse(sqlStatement);
+    } finally {
+      planner.close();
+    }
+    return parsed;
   }
 
   /**
@@ -119,7 +125,7 @@ public SqlNode parseQuery(String sqlQuery) throws 
SqlParseException{
    * {@code PCollection} so more operations can be applied.
    */
   public PCollection<Row> compileBeamPipeline(String sqlStatement, Pipeline 
basePipeline)
-      throws Exception {
+      throws ValidationException, RelConversionException, SqlParseException {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
     // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
@@ -134,6 +140,7 @@ public SqlNode parseQuery(String sqlQuery) throws 
SqlParseException{
   public BeamRelNode convertToBeamRel(String sqlStatement)
       throws ValidationException, RelConversionException, SqlParseException {
     BeamRelNode beamRelNode;
+    Planner planner = getPlanner();
     try {
       SqlNode parsed = planner.parse(sqlStatement);
       SqlNode validated = planner.validate(parsed);
@@ -153,8 +160,8 @@ public BeamRelNode convertToBeamRel(String sqlStatement)
     return beamRelNode;
   }
 
-  public Planner getPlanner() {
-    return planner;
+  private Planner getPlanner() {
+    return Frameworks.getPlanner(config);
   }
 
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index ea4f55f2504..c49ffed17ca 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -516,8 +515,7 @@ public void testUnsupportedDistinct() throws Exception {
 
   @Test
   public void testUnsupportedGlobalWindowWithDefaultTrigger() {
-    exceptions.expect(IllegalStateException.class);
-    exceptions.expectCause(isA(UnsupportedOperationException.class));
+    exceptions.expect(UnsupportedOperationException.class);
 
     pipeline.enableAbandonedNodeEnforcement(false);
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
index 3c1d31067ad..01a3eb987d9 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
@@ -126,22 +126,6 @@ public void testFromInvalidTableName1() throws Exception {
     pipeline.run().waitUntilFinish();
   }
 
-  @Test
-  public void testFromInvalidTableName2() throws Exception {
-    exceptions.expect(IllegalStateException.class);
-    exceptions.expectMessage("Use PCOLLECTION as table name"
-                                 + " when selecting from single PCollection."
-                                 + " Use PCollectionTuple to explicitly "
-                                 + "name the input PCollections");
-    pipeline.enableAbandonedNodeEnforcement(false);
-
-    String sql = "SELECT * FROM PCOLLECTION_NA";
-
-    boundedInput1.apply(BeamSql.query(sql));
-
-    pipeline.run().waitUntilFinish();
-  }
-
   @Test
   public void testInvalidFilter() throws Exception {
     exceptions.expect(IllegalStateException.class);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index 2b19daf149b..41d1d3eadea 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -21,9 +21,6 @@
 import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
 import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
 import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.hasProperty;
-import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.stringContainsInOrder;
 
 import java.util.Arrays;
@@ -39,7 +36,6 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
-import org.hamcrest.Matcher;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -155,7 +151,7 @@ public void testFullOuterJoin() throws Exception {
     pipeline.run();
   }
 
-  @Test(expected = IllegalStateException.class)
+  @Test(expected = UnsupportedOperationException.class)
   public void testException_nonEqualJoin() throws Exception {
     String sql =
         "SELECT *  "
@@ -169,10 +165,8 @@ public void testException_nonEqualJoin() throws Exception {
     pipeline.run();
   }
 
-  @Test
+  @Test(expected = UnsupportedOperationException.class)
   public void testException_crossJoin() throws Exception {
-    thrown.expect(IllegalStateException.class);
-
     String sql =
         "SELECT *  "
             + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2";
@@ -230,7 +224,9 @@ public void 
testRejectsUnboundedWithinWindowsWithEndOfWindowTrigger() throws Exc
                    .accumulatingFiredPanes());
     PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
 
-    thrown.expectCause(expectedSingleFireTrigger());
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        stringContainsInOrder(Arrays.asList("once per window", "default 
trigger")));
 
     inputs.apply("sql", BeamSql.query(sql));
 
@@ -250,7 +246,9 @@ public void 
testRejectsGlobalWindowsWithDefaultTriggerInUnboundedInput() throws
     PCollection<Row> orders = ordersUnbounded();
     PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
 
-    thrown.expectCause(expectedSingleFireTrigger());
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        stringContainsInOrder(Arrays.asList("once per window", "default 
trigger")));
 
     inputs.apply("sql", BeamSql.query(sql));
 
@@ -276,7 +274,9 @@ public void 
testRejectsGlobalWindowsWithEndOfWindowTrigger() throws Exception {
                    .accumulatingFiredPanes());
     PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
 
-    thrown.expectCause(expectedSingleFireTrigger());
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        stringContainsInOrder(Arrays.asList("once per window", "default 
trigger")));
 
     inputs.apply("sql", BeamSql.query(sql));
 
@@ -303,7 +303,9 @@ public void 
testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
                 .accumulatingFiredPanes());
     PCollectionTuple inputs = tuple("ORDER_DETAILS1", orders, 
"ORDER_DETAILS2", orders);
 
-    thrown.expectCause(expectedSingleFireTrigger());
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        stringContainsInOrder(Arrays.asList("once per window", "default 
trigger")));
 
     inputs.apply("sql", BeamSql.query(sql));
 
@@ -342,12 +344,4 @@ public void 
testRejectsNonGlobalWindowsWithRepeatingTrigger() throws Exception {
         .apply("join", BeamSql.query(sql))
         .setCoder(RESULT_CODER);
   }
-
-  private Matcher<UnsupportedOperationException> expectedSingleFireTrigger() {
-    return allOf(
-        isA(UnsupportedOperationException.class),
-        hasProperty("message",
-                    stringContainsInOrder(
-                        Arrays.asList("once per window", "default trigger"))));
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
index 5a9ca6c2ce3..7149b4b2601 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
@@ -76,10 +76,9 @@ private Table executeCreateTableWith(String fieldType) {
         + "LOCATION '/home/admin/person'\n";
     System.out.println(createTable);
 
-    BeamSqlParser parser = new BeamSqlParser(createTable);
     SqlNode sqlNode;
     try {
-      sqlNode = parser.impl().parseSqlStmtEof();
+      sqlNode = ParserTestUtils.parse(createTable);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
similarity index 92%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 8bfe6c53237..398a927598f 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamSqlParserTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -26,16 +26,19 @@
 import com.alibaba.fastjson.JSONObject;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
 import org.junit.Test;
 
 /**
- * UnitTest for {@link BeamSqlParser}.
+ * UnitTest for {@link BeamSqlParserImpl}.
  */
-public class BeamSqlParserTest {
+public class BeamDDLTest {
+
   @Test
   public void testParseCreateTable_full() throws Exception {
     JSONObject properties = new JSONObject();
@@ -59,7 +62,7 @@ public void testParseCreateTable_full() throws Exception {
     );
   }
 
-  @Test(expected = 
org.apache.beam.sdk.extensions.sql.impl.parser.impl.ParseException.class)
+  @Test(expected = SqlParseException.class)
   public void testParseCreateTable_withoutType() throws Exception {
     parseTable(
         "create table person (\n"
@@ -124,19 +127,17 @@ public void testParseCreateTable_withoutLocation() throws 
Exception {
 
   @Test
   public void testParseDropTable() throws Exception {
-    BeamSqlParser parser = new BeamSqlParser("drop table person");
-    SqlNode sqlNode = parser.impl().parseSqlStmtEof();
+    SqlNode sqlNode = ParserTestUtils.parse("drop table person");
 
     assertNotNull(sqlNode);
     assertTrue(sqlNode instanceof SqlDropTable);
     SqlDropTable stmt = (SqlDropTable) sqlNode;
     assertNotNull(stmt);
-    assertEquals("PERSON", stmt.name.getSimple());
+    assertEquals("person", stmt.name.getSimple());
   }
 
   private Table parseTable(String sql) throws Exception {
-    BeamSqlParser parser = new BeamSqlParser(sql);
-    SqlNode sqlNode = parser.impl().parseSqlStmtEof();
+    SqlNode sqlNode = ParserTestUtils.parse(sql);
 
     assertNotNull(sqlNode);
     assertTrue(sqlNode instanceof SqlCreateTable);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
new file mode 100644
index 00000000000..d06bb4b73f2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Frameworks;
+
+class ParserTestUtils {
+  static SqlNode parse(String sql) throws SqlParseException {
+    BeamQueryPlanner planner = new 
BeamQueryPlanner(Frameworks.createRootSchema(false));
+    return planner.parse(sql);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 100399)
    Time Spent: 16h 20m  (was: 16h 10m)

> Take advantage of Calcite DDL
> -----------------------------
>
>                 Key: BEAM-4044
>                 URL: https://issues.apache.org/jira/browse/BEAM-4044
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 16h 20m
>  Remaining Estimate: 0h
>
> In Calcite 1.15 support for abstract DDL moved into calcite core. We should 
> take advantage of that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to