[ 
https://issues.apache.org/jira/browse/BEAM-5644?focusedWorklogId=238816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-238816
 ]

ASF GitHub Bot logged work on BEAM-5644:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/May/19 21:31
            Start Date: 07/May/19 21:31
    Worklog Time Spent: 10m 
      Work Description: akedin commented on pull request #7745: [BEAM-5644] 
Plugable Planners
URL: https://github.com/apache/beam/pull/7745#discussion_r281836770
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 ##########
 @@ -73,109 +80,182 @@ public static BeamSqlEnv inMemory(TableProvider... 
tableProviders) {
     return withTableProvider(inMemoryMetaStore);
   }
 
-  private void registerBuiltinUdf(Map<String, List<Method>> methods) {
-    for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
-      for (Method method : entry.getValue()) {
-        connection.getCurrentSchemaPlus().add(entry.getKey(), 
UdfImpl.create(method));
-      }
-    }
+  public BeamRelNode parseQuery(String query) throws ParseException {
+    return planner.convertToBeamRel(query);
   }
 
-  public void addSchema(String name, TableProvider tableProvider) {
-    connection.setSchema(name, tableProvider);
+  public boolean isDdl(String sqlStatement) throws ParseException {
+    return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
   }
 
-  public void setCurrentSchema(String name) {
-    try {
-      connection.setSchema(name);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
+  public void executeDdl(String sqlStatement) throws ParseException {
+    SqlExecutableStatement ddl = (SqlExecutableStatement) 
planner.parse(sqlStatement);
+    ddl.execute(getContext());
   }
 
-  /** Register a UDF function which can be used in SQL expression. */
-  public void registerUdf(String functionName, Class<?> clazz, String method) {
-    connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, 
method));
+  public CalcitePrepare.Context getContext() {
+    return connection.createPrepareContext();
   }
 
-  /** Register a UDF function which can be used in SQL expression. */
-  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> 
clazz) {
-    registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+  public Map<String, String> getPipelineOptions() {
+    return connection.getPipelineOptionsMap();
   }
 
-  /**
-   * Register {@link SerializableFunction} as a UDF function which can be used 
in SQL expression.
-   * Note, {@link SerializableFunction} must have a constructor without 
arguments.
-   */
-  public void registerUdf(String functionName, SerializableFunction sfn) {
-    registerUdf(functionName, sfn.getClass(), "apply");
+  public String explain(String sqlString) throws ParseException {
+    try {
+      return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
+    } catch (Exception e) {
+      throw new ParseException("Unable to parse statement", e);
+    }
   }
 
-  /**
-   * Register a UDAF function which can be used in GROUP-BY expression. See 
{@link
-   * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a 
UDAF.
-   */
-  public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
-    connection.getCurrentSchemaPlus().add(functionName, new 
UdafImpl(combineFn));
-  }
+  /** BeamSqlEnv's Builder. */
+  public static class BeamSqlEnvBuilder {
+    private String queryPlannerClassName =
+        "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
 
-  /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
-  public void loadUdfUdafFromProvider() {
-    ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
-        .forEach(
-            ins -> {
-              ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> 
registerUdf(udfName, udfClass));
-              ins.getSerializableFunctionUdfs()
-                  .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
-              ins.getUdafs().forEach((udafName, udafFn) -> 
registerUdaf(udafName, udafFn));
-            });
-  }
+    private TableProvider initialTableProvider;
+    private String currentSchemaName;
+    private Map<String, TableProvider> schemaMap = new HashMap<>();
+    private Set<Map.Entry<String, Function>> functionSet = new HashSet<>();
 
-  public void loadBeamBuiltinFunctions() {
-    for (BeamBuiltinFunctionProvider provider :
-        ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
-      registerBuiltinUdf(provider.getBuiltinMethods());
+    public BeamSqlEnvBuilder setInitializeTableProvider(TableProvider 
tableProvider) {
+      initialTableProvider = tableProvider;
+      return this;
     }
-  }
 
-  public BeamRelNode parseQuery(String query) throws ParseException {
-    try {
-      return planner.convertToBeamRel(query);
-    } catch (ValidationException | RelConversionException | SqlParseException 
e) {
-      throw new ParseException(String.format("Unable to parse query %s", 
query), e);
+    public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> 
methods) {
+      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+        for (Method method : entry.getValue()) {
+          functionSet.add(new SimpleEntry<>(entry.getKey(), 
UdfImpl.create(method)));
+        }
+      }
+      return this;
     }
-  }
 
-  public boolean isDdl(String sqlStatement) throws ParseException {
-    try {
-      return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
-    } catch (SqlParseException e) {
-      throw new ParseException("Unable to parse statement", e);
+    public BeamSqlEnvBuilder addSchema(String name, TableProvider 
tableProvider) {
+      if (schemaMap.containsKey(name)) {
+        throw new RuntimeException("Schema " + name + " is registered twice.");
+      }
+
+      schemaMap.put(name, tableProvider);
+      return this;
     }
-  }
 
-  public void executeDdl(String sqlStatement) throws ParseException {
-    try {
-      SqlExecutableStatement ddl = (SqlExecutableStatement) 
planner.parse(sqlStatement);
-      ddl.execute(getContext());
-    } catch (SqlParseException e) {
-      throw new ParseException("Unable to parse DDL statement", e);
+    public BeamSqlEnvBuilder setCurrentSchema(String name) {
+      currentSchemaName = name;
+      return this;
     }
-  }
 
-  public CalcitePrepare.Context getContext() {
-    return connection.createPrepareContext();
-  }
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, 
String method) {
+      functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, 
method)));
 
-  public Map<String, String> getPipelineOptions() {
-    return connection.getPipelineOptionsMap();
-  }
+      return this;
+    }
 
-  public String explain(String sqlString) throws ParseException {
-    try {
-      return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
-    } catch (ValidationException | RelConversionException | SqlParseException 
e) {
-      throw new ParseException("Unable to parse statement", e);
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends 
BeamSqlUdf> clazz) {
+      return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+    }
+
+    public BeamSqlEnvBuilder registerUdf(String functionName, 
SerializableFunction sfn) {
+      return registerUdf(functionName, sfn.getClass(), "apply");
+    }
+
+    /**
+     * Register a UDAF function which can be used in GROUP-BY expression. See 
{@link
+     * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a 
UDAF.
+     */
+    public BeamSqlEnvBuilder registerUdaf(String functionName, 
Combine.CombineFn combineFn) {
+      functionSet.add(new SimpleEntry<>(functionName, new 
UdafImpl(combineFn)));
+      return this;
+    }
+
+    /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
+    public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
+      ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
+          .forEach(
+              ins -> {
+                ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> 
registerUdf(udfName, udfClass));
+                ins.getSerializableFunctionUdfs()
+                    .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
+                ins.getUdafs().forEach((udafName, udafFn) -> 
registerUdaf(udafName, udafFn));
+              });
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
+      for (BeamBuiltinFunctionProvider provider :
+          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+        registerBuiltinUdf(provider.getBuiltinMethods());
+      }
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder setQueryPlannerClassName(String name) {
+      queryPlannerClassName = name;
+      return this;
+    }
+
+    /**
+     * Build function to create an instance of BeamSqlEnv based on preset 
fields.
+     *
+     * @return BeamSqlEnv.
+     */
+    public BeamSqlEnv build() {
+      // This check is to retain backward compatible because most of 
BeamSqlEnv are initialized by
+      // withTableProvider API.
+      if (initialTableProvider == null) {
+        throw new RuntimeException("initialTableProvider must be set in 
BeamSqlEnvBuilder.");
+      }
+
+      JdbcConnection jdbcConnection = JdbcDriver.connect(initialTableProvider);
+
+      // set schema
+      for (Map.Entry<String, TableProvider> schemaEntry : 
schemaMap.entrySet()) {
+        jdbcConnection.setSchema(schemaEntry.getKey(), schemaEntry.getValue());
+      }
+
+      // reset default schema
+      if (currentSchemaName != null) {
+        try {
+          jdbcConnection.setSchema(currentSchemaName);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      // add UDF
+      for (Map.Entry<String, Function> functionEntry : functionSet) {
+        jdbcConnection.getCurrentSchemaPlus().add(functionEntry.getKey(), 
functionEntry.getValue());
+      }
+
+      QueryPlanner planner;
+
+      if (queryPlannerClassName.equals(
 
 Review comment:
   extract into a function
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 238816)
    Time Spent: 10.5h  (was: 10h 20m)

> make Planner configurable 
> --------------------------
>
>                 Key: BEAM-5644
>                 URL: https://issues.apache.org/jira/browse/BEAM-5644
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> We can make planner configurable here: 
> [BeamQueryPlanner.java#L145|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java#L145]
>  
> By doing so, we can have different planner implementation to support 
> different SQL dialect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to