[
https://issues.apache.org/jira/browse/BEAM-5644?focusedWorklogId=195288&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195288
]
ASF GitHub Bot logged work on BEAM-5644:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Feb/19 18:34
Start Date: 06/Feb/19 18:34
Worklog Time Spent: 10m
Work Description: akedin commented on pull request #7745: [BEAM-5644]
make Planner configurable
URL: https://github.com/apache/beam/pull/7745#discussion_r254389574
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
##########
@@ -174,8 +106,110 @@ public void executeDdl(String sqlStatement) throws
ParseException {
public String explain(String sqlString) throws ParseException {
try {
return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
- } catch (ValidationException | RelConversionException | SqlParseException
e) {
+ } catch (Exception e) {
throw new ParseException("Unable to parse statement", e);
}
}
+
+ /** BeamSqlEnv's Builder. */
+ public static class BeamSqlEnvBuilder {
+ private final JdbcConnection jdbcConnection;
+ private String plannerName = "CalcitePlanner";
+
+ public static BeamSqlEnvBuilder builder(TableProvider tableProvider) {
+ return new BeamSqlEnvBuilder(tableProvider);
+ }
+
+ private BeamSqlEnvBuilder(TableProvider tableProvider) {
+ jdbcConnection = JdbcDriver.connect(tableProvider);
+ }
+
+ public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>>
methods) {
+ for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+ for (Method method : entry.getValue()) {
+ jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(),
UdfImpl.create(method));
+ }
+ }
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder addSchema(String name, TableProvider
tableProvider) {
+ jdbcConnection.setSchema(name, tableProvider);
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder setCurrentSchema(String name) {
+ try {
+ jdbcConnection.setSchema(name);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return this;
+ }
+
+ /** Register a UDF function which can be used in SQL expression. */
+ public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz,
String method) {
+ jdbcConnection.getCurrentSchemaPlus().add(functionName,
UdfImpl.create(clazz, method));
+
+ return this;
+ }
+
+ /** Register a UDF function which can be used in SQL expression. */
+ public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends
BeamSqlUdf> clazz) {
+ return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+ }
+
+ public BeamSqlEnvBuilder registerUdf(String functionName,
SerializableFunction sfn) {
+ return registerUdf(functionName, sfn.getClass(), "apply");
+ }
+
+ /**
+ * Register a UDAF function which can be used in GROUP-BY expression. See
{@link
+ * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a
UDAF.
+ */
+ public BeamSqlEnvBuilder registerUdaf(String functionName,
Combine.CombineFn combineFn) {
+ jdbcConnection.getCurrentSchemaPlus().add(functionName, new
UdafImpl(combineFn));
+
+ return this;
+ }
+
+ /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
+ public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
+ ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
+ .forEach(
+ ins -> {
+ ins.getBeamSqlUdfs().forEach((udfName, udfClass) ->
registerUdf(udfName, udfClass));
+ ins.getSerializableFunctionUdfs()
+ .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
+ ins.getUdafs().forEach((udafName, udafFn) ->
registerUdaf(udafName, udafFn));
+ });
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
+ for (BeamBuiltinFunctionProvider provider :
+ ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+ registerBuiltinUdf(provider.getBuiltinMethods());
+ }
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder setPlannerName(String name) {
+ plannerName = name;
+ return this;
+ }
+
+ public BeamSqlEnv build() {
+ if (plannerName.equals("CalcitePlanner")) {
+ return new BeamSqlEnv(jdbcConnection, new
CalciteQueryPlanner(jdbcConnection));
Review comment:
nit:I think the pattern is usually to pass the builder into the constructor
and then extract builder fields inside the constructor, this way when changing
the builder interface you need to change fewer places
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 195288)
Time Spent: 6h (was: 5h 50m)
> make Planner configurable
> --------------------------
>
> Key: BEAM-5644
> URL: https://issues.apache.org/jira/browse/BEAM-5644
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: Major
> Time Spent: 6h
> Remaining Estimate: 0h
>
> We can make planner configurable here:
> [BeamQueryPlanner.java#L145|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java#L145]
>
> By doing so, we can have different planner implementation to support
> different SQL dialect.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)