[
https://issues.apache.org/jira/browse/BEAM-3773?focusedWorklogId=103112&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103112
]
ASF GitHub Bot logged work on BEAM-3773:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/May/18 20:58
Start Date: 17/May/18 20:58
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5399: [BEAM-3773] JDBC
URL: https://github.com/apache/beam/pull/5399
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index 74e82b886d7..879107d28d1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -109,6 +109,9 @@ rat {
// VCF test files
"**/apache_beam/testing/data/vcf/*",
+
+ // JDBC package config files
+ "**/META-INF/services/java.sql.Driver",
]
// Add .gitignore excludes to the Apache Rat exclusion list. We re-create
the behavior
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
new file mode 100644
index 00000000000..30a9af5fd0e
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Map;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProvider;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Factory that creates a {@link BeamCalciteSchema}.
+ */
+public class BeamCalciteSchemaFactory implements SchemaFactory {
+ public static final BeamCalciteSchemaFactory INSTANCE = new
BeamCalciteSchemaFactory();
+
+ private BeamCalciteSchemaFactory() {
+ }
+
+ @Override
+ public Schema create(SchemaPlus parentSchema, String name,
+ Map<String, Object> operand) {
+ MetaStore metaStore = new InMemoryMetaStore();
+ metaStore.registerProvider(new BigQueryTableProvider());
+ metaStore.registerProvider(new KafkaTableProvider());
+ metaStore.registerProvider(new PubsubJsonTableProvider());
+ metaStore.registerProvider(new TextTableProvider());
+ return new BeamCalciteSchema(metaStore);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index e925eeb8e62..da083a0448e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -20,7 +20,6 @@
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -45,18 +44,15 @@
class BeamCalciteTable extends AbstractQueryableTable
implements ModifiableTable, TranslatableTable {
private final BeamSqlTable beamTable;
- private final RelDataType rowType;
public BeamCalciteTable(BeamSqlTable beamTable) {
super(Object[].class);
this.beamTable = beamTable;
- this.rowType = CalciteUtils.toCalciteRowType(this.beamTable.getSchema(),
- BeamQueryPlanner.TYPE_FACTORY);
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return rowType;
+ return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(),
typeFactory);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index bbc3165175f..0c63c72f6ec 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
-import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
@@ -26,14 +25,10 @@
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.tools.RelRunner;
/**
* {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and
{@link BeamSqlCli}.
@@ -42,21 +37,21 @@
* {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL
queries.
*/
public class BeamSqlEnv {
- final CalciteSchema schema;
- final CalciteSchema defaultSchema;
+ final CalciteConnection connection;
+ final SchemaPlus defaultSchema;
final BeamQueryPlanner planner;
public BeamSqlEnv(TableProvider tableProvider) {
- schema = CalciteSchema.createRootSchema(true, false);
- defaultSchema = schema.add("beam", new BeamCalciteSchema(tableProvider));
- planner = new BeamQueryPlanner(defaultSchema.plus());
+ connection = JdbcDriver.connect(tableProvider);
+ defaultSchema = JdbcDriver.getDefaultSchema(connection);
+ planner = new BeamQueryPlanner(connection);
}
/**
* Register a UDF function which can be used in SQL expression.
*/
public void registerUdf(String functionName, Class<?> clazz, String method) {
- schema.plus().add(functionName, ScalarFunctionImpl.create(clazz, method));
+ defaultSchema.add(functionName, ScalarFunctionImpl.create(clazz, method));
}
/**
@@ -79,7 +74,7 @@ public void registerUdf(String functionName,
SerializableFunction sfn) {
* See {@link org.apache.beam.sdk.transforms.Combine.CombineFn} on how to
implement a UDAF.
*/
public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
- schema.plus().add(functionName, new UdafImpl(combineFn));
+ defaultSchema.add(functionName, new UdafImpl(combineFn));
}
public BeamQueryPlanner getPlanner() {
@@ -87,53 +82,6 @@ public BeamQueryPlanner getPlanner() {
}
public CalcitePrepare.Context getContext() {
- return new ContextImpl();
- }
-
- private class ContextImpl implements CalcitePrepare.Context {
- @Override
- public JavaTypeFactory getTypeFactory() {
- return planner.TYPE_FACTORY;
- }
-
- @Override
- public CalciteSchema getRootSchema() {
- return schema;
- }
-
- @Override
- public CalciteSchema getMutableRootSchema() {
- return getRootSchema();
- }
-
- @Override
- public List<String> getDefaultSchemaPath() {
- return defaultSchema.path(null);
- }
-
- @Override
- public List<String> getObjectPath() {
- return null;
- }
-
- @Override
- public CalciteConnectionConfig config() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DataContext getDataContext() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public RelRunner getRelRunner() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CalcitePrepare.SparkHandler spark() {
- throw new UnsupportedOperationException();
- }
+ return connection.createPrepareContext();
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
new file mode 100644
index 00000000000..c733cf422ee
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.calcite.avatica.ConnectionProperty;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.Driver;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Calcite JDBC driver with Beam defaults.
+ */
+public class JdbcDriver extends Driver {
+ public static final JdbcDriver INSTANCE = new JdbcDriver();
+ public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
+ private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
+
+ static {
+ INSTANCE.register();
+ }
+
+ @Override protected String getConnectStringPrefix() {
+ return CONNECT_STRING_PREFIX;
+ }
+
+ @Override public Connection connect(String url, Properties info) throws
SQLException {
+ final BeamCalciteSchema beamCalciteSchema = (BeamCalciteSchema)
info.get(BEAM_CALCITE_SCHEMA);
+
+ Properties info2 = new Properties(info);
+ setDefault(info2, CalciteConnectionProperty.LEX, Lex.JAVA.name());
+ setDefault(info2, CalciteConnectionProperty.PARSER_FACTORY,
+ BeamSqlParserImpl.class.getName() + "#FACTORY");
+ setDefault(info2, CalciteConnectionProperty.TYPE_SYSTEM,
+ BeamRelDataTypeSystem.class.getName());
+ setDefault(info2, CalciteConnectionProperty.SCHEMA, "beam");
+ setDefault(info2, CalciteConnectionProperty.SCHEMA_FACTORY,
+ BeamCalciteSchemaFactory.class.getName());
+
+ CalciteConnection connection = (CalciteConnection) super.connect(url,
info2);
+ final SchemaPlus defaultSchema;
+ if (beamCalciteSchema != null) {
+ defaultSchema = connection.getRootSchema()
+ .add(connection.config().schema(), beamCalciteSchema);
+ connection.setSchema(defaultSchema.getName());
+ } else {
+ defaultSchema = getDefaultSchema(connection);
+ }
+
+ // Beam schema may change without notifying Calcite
+ defaultSchema.setCacheEnabled(false);
+ return connection;
+ }
+
+ private static void setDefault(Properties info, ConnectionProperty key,
String value) {
+ // A null value indicates the default. We want to override defaults only.
+ if (info.getProperty(key.camelName()) == null) {
+ info.setProperty(key.camelName(), value);
+ }
+ }
+
+ static CalciteConnection connect(TableProvider tableProvider) {
+ try {
+ Properties info = new Properties();
+ info.put(BEAM_CALCITE_SCHEMA, new BeamCalciteSchema(tableProvider));
+ return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static SchemaPlus getDefaultSchema(CalciteConnection connection) {
+ try {
+ String defaultSchemaName = connection.getSchema();
+ return connection.getRootSchema().getSubSchema(defaultSchemaName);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
index 94c0d96672f..4ad67472e59 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamQueryPlanner.java
@@ -17,20 +17,17 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.planner;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptUtil;
@@ -39,13 +36,13 @@
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
@@ -65,33 +62,47 @@
private final FrameworkConfig config;
- public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT);
+ public BeamQueryPlanner(CalciteConnection connection) {
+ final CalciteConnectionConfig config = connection.config();
+ final SqlParser.ConfigBuilder parserConfig = SqlParser.configBuilder()
+ .setQuotedCasing(config.quotedCasing())
+ .setUnquotedCasing(config.unquotedCasing())
+ .setQuoting(config.quoting())
+ .setConformance(config.conformance())
+ .setCaseSensitive(config.caseSensitive());
+ final SqlParserImplFactory parserFactory =
+ config.parserFactory(SqlParserImplFactory.class, null);
+ if (parserFactory != null) {
+ parserConfig.setParserFactory(parserFactory);
+ }
+
+ final SchemaPlus schema = connection.getRootSchema();
+ final SchemaPlus defaultSchema = JdbcDriver.getDefaultSchema(connection);
- public BeamQueryPlanner(SchemaPlus schema) {
- final List<RelTraitDef> traitDefs = new ArrayList<>();
- traitDefs.add(ConventionTraitDef.INSTANCE);
- traitDefs.add(RelCollationTraitDef.INSTANCE);
+ final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(
+ ConventionTraitDef.INSTANCE,
+ RelCollationTraitDef.INSTANCE);
- List<SqlOperatorTable> sqlOperatorTables = new ArrayList<>();
- sqlOperatorTables.add(SqlStdOperatorTable.instance());
- sqlOperatorTables.add(
+ final CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
- CalciteSchema.from(schema).root(), Collections.emptyList(),
TYPE_FACTORY, null));
+ CalciteSchema.from(schema),
+ ImmutableList.of(defaultSchema.getName()),
+ connection.getTypeFactory(),
+ connection.config());
+ final SqlOperatorTable opTab0 =
+ connection.config().fun(SqlOperatorTable.class,
+ SqlStdOperatorTable.instance());
- config =
+ this.config =
Frameworks.newConfigBuilder()
- .parserConfig(SqlParser.configBuilder()
- .setLex(Lex.JAVA)
- .setParserFactory(BeamSqlParserImpl.FACTORY)
- .build())
- .defaultSchema(schema)
+ .parserConfig(parserConfig.build())
+ .defaultSchema(defaultSchema)
.traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
+ .context(Contexts.of(connection.config()))
.ruleSets(BeamRuleSets.getRuleSets())
.costFactory(null)
- .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
- .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
+ .typeSystem(connection.getTypeFactory().getTypeSystem())
+ .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
.build();
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
index 5734653c4f9..7dc16e14840 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java
@@ -25,7 +25,10 @@
*
*/
public class BeamRelDataTypeSystem extends RelDataTypeSystemImpl {
- public static final RelDataTypeSystem BEAM_REL_DATATYPE_SYSTEM = new
BeamRelDataTypeSystem();
+ public static final RelDataTypeSystem INSTANCE = new BeamRelDataTypeSystem();
+
+ private BeamRelDataTypeSystem() {
+ }
@Override
public int getMaxNumericScale() {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
index 11e4f5e8616..0fcde375880 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamLogicalConvention.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
@@ -53,6 +55,9 @@ public boolean satisfies(RelTrait trait) {
@Override
public void register(RelOptPlanner planner) {
+ for (RelOptRule rule : BeamRuleSets.getRuleSets()[0]) {
+ planner.addRule(rule);
+ }
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/resources/META-INF/services/java.sql.Driver
b/sdks/java/extensions/sql/src/main/resources/META-INF/services/java.sql.Driver
new file mode 100644
index 00000000000..ecea8158b70
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/resources/META-INF/services/java.sql.Driver
@@ -0,0 +1 @@
+org.apache.beam.sdk.extensions.sql.impl.JdbcDriver
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
new file mode 100644
index 00000000000..bff6bfed892
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.beam.sdk.extensions.sql.meta.provider.BeamSqlTableProvider;
+import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.junit.Test;
+
+/**
+ * Test for {@link JdbcDriver}.
+ */
+public class JdbcDriverTest {
+
+ @Test
+ public void testDriverManager_getDriver() throws Exception {
+ Driver driver = DriverManager.getDriver(JdbcDriver.CONNECT_STRING_PREFIX);
+ assertTrue(driver instanceof JdbcDriver);
+ }
+
+ @Test
+ public void testDriverManager_simple() throws Exception {
+ Connection connection =
DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+ Statement statement = connection.createStatement();
+ // SELECT 1 is a special case and does not reach the parser
+ assertTrue(statement.execute("SELECT 1"));
+ }
+
+ @Test
+ public void testDriverManager_parse() throws Exception {
+ Connection connection =
DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+ Statement statement = connection.createStatement();
+ assertTrue(statement.execute("SELECT 'beam'"));
+ }
+
+ @Test
+ public void testDriverManager_ddl() throws Exception {
+ Connection connection =
DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+
+ // Ensure no tables
+ final DatabaseMetaData metadata = connection.getMetaData();
+ ResultSet resultSet = metadata.getTables(null, null, null, new String[]
{"TABLE"});
+ assertFalse(resultSet.next());
+
+ // Create tables
+ Statement statement = connection.createStatement();
+ assertEquals(0, statement.executeUpdate(
+ "CREATE TABLE test (id INTEGER) TYPE 'text'"));
+
+ // Ensure table test
+ resultSet = metadata.getTables(null, null, null, new String[] {"TABLE"});
+ assertTrue(resultSet.next());
+ assertEquals("test", resultSet.getString("TABLE_NAME"));
+ assertFalse(resultSet.next());
+
+ // Create tables
+ assertEquals(0, statement.executeUpdate(
+ "DROP TABLE test"));
+
+ // Ensure no tables
+ resultSet = metadata.getTables(null, null, null, new String[] {"TABLE"});
+ assertFalse(resultSet.next());
+ }
+
+ @Test
+ public void testInternalConnect_boundedTable() throws Exception {
+ BeamSqlTableProvider tableProvider = new BeamSqlTableProvider("test",
ImmutableMap.of(
+ "test",
+ MockedBoundedTable
+ .of(TypeName.INT32, "id",
+ TypeName.STRING, "name")
+ .addRows(
+ 1, "first")));
+ CalciteConnection connection = JdbcDriver.connect(tableProvider);
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
+ assertTrue(resultSet.next());
+ assertEquals(1, resultSet.getInt("id"));
+ assertEquals("first", resultSet.getString("name"));
+ assertFalse(resultSet.next());
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
index 637bfb08938..477ed866009 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java
@@ -56,7 +56,6 @@
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -237,7 +236,7 @@ public void testBuildExpression_string() {
Arrays.asList(
rexBuilder.makeLiteral("hello"),
rexBuilder.makeLiteral("worldhello"),
-
rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+
rexBuilder.makeCast(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
rexBuilder.makeBigintLiteral(BigDecimal.ONE))
)
);
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
index 50dc8e302a5..dbce8db8a2b 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java
@@ -20,8 +20,6 @@
import java.util.ArrayList;
import java.util.List;
import
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.values.Row;
@@ -48,7 +46,7 @@
/** base class to test {@link BeamSqlFnExecutor} and subclasses of {@link
BeamSqlExpression}. */
public class BeamSqlFnExecutorTestBase {
static final JavaTypeFactory TYPE_FACTORY = new
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
- static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+ static RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(),
rexBuilder);
static RelDataType relDataType;
static RelBuilder relBuilder;
@@ -83,7 +81,7 @@ public static void prepare() {
.context(Contexts.EMPTY_CONTEXT)
.ruleSets(BeamRuleSets.getRuleSets())
.costFactory(null)
- .typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM)
+ .typeSystem(TYPE_FACTORY.getTypeSystem())
.build();
relBuilder = RelBuilder.create(config);
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
index d06bb4b73f2..578c19b452e 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/ParserTestUtils.java
@@ -17,14 +17,17 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.parser;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.meta.provider.BeamSqlTableProvider;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Frameworks;
class ParserTestUtils {
+ private static final BeamSqlEnv env =
+ new BeamSqlEnv(new BeamSqlTableProvider("test", ImmutableMap.of()));
+
static SqlNode parse(String sql) throws SqlParseException {
- BeamQueryPlanner planner = new
BeamQueryPlanner(Frameworks.createRootSchema(false));
- return planner.parse(sql);
+ return env.getPlanner().parse(sql);
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
index 3a52cacfb2b..d148db034e8 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java
@@ -26,7 +26,6 @@
import java.math.BigDecimal;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -39,12 +38,15 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.junit.Test;
@@ -53,14 +55,15 @@
* Test for {@code BeamEnumerableConverter}.
*/
public class BeamEnumerableConverterTest {
- static RexBuilder rexBuilder = new RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+ static final JavaTypeFactory TYPE_FACTORY = new
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ static RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
static PipelineOptions options = PipelineOptionsFactory.create();
static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(),
rexBuilder);
@Test
public void testToEnumerable_collectSingle() {
Schema schema = Schema.builder().addInt64Field("id", false).build();
- RelDataType type = CalciteUtils.toCalciteRowType(schema,
BeamQueryPlanner.TYPE_FACTORY);
+ RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
ImmutableList<ImmutableList<RexLiteral>> tuples =
ImmutableList.of(ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO)));
BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null);
@@ -78,7 +81,7 @@ public void testToEnumerable_collectSingle() {
public void testToEnumerable_collectMultiple() {
Schema schema =
Schema.builder().addInt64Field("id", false).addInt64Field("otherid",
false).build();
- RelDataType type = CalciteUtils.toCalciteRowType(schema,
BeamQueryPlanner.TYPE_FACTORY);
+ RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
ImmutableList<ImmutableList<RexLiteral>> tuples =
ImmutableList.of(
ImmutableList.of(
@@ -132,7 +135,7 @@ public void processElement(ProcessContext context) {}
@Test
public void testToEnumerable_count() {
Schema schema = Schema.builder().addInt64Field("id", false).build();
- RelDataType type = CalciteUtils.toCalciteRowType(schema,
BeamQueryPlanner.TYPE_FACTORY);
+ RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY);
ImmutableList<ImmutableList<RexLiteral>> tuples =
ImmutableList.of(
ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO)),
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
index c0ed677b126..1d28fc43519 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
@@ -19,7 +19,6 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
@@ -30,6 +29,9 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.csv.CSVFormat;
import org.junit.Rule;
@@ -84,8 +86,9 @@
}
private static Schema genRowType() {
+ JavaTypeFactory typeFactory = new
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
return CalciteUtils.toBeamSchema(
- BeamQueryPlanner.TYPE_FACTORY.builder()
+ typeFactory.builder()
.add("order_id", SqlTypeName.BIGINT)
.add("site_id", SqlTypeName.INTEGER)
.add("price", SqlTypeName.DOUBLE)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 103112)
Time Spent: 7h 10m (was: 7h)
> [SQL] Investigate JDBC interface for Beam SQL
> ---------------------------------------------
>
> Key: BEAM-3773
> URL: https://issues.apache.org/jira/browse/BEAM-3773
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Anton Kedin
> Assignee: Andrew Pilloud
> Priority: Major
> Time Spent: 7h 10m
> Remaining Estimate: 0h
>
> JDBC allows integration with a lot of third-party tools, e.g
> [Zeppelin|https://zeppelin.apache.org/docs/0.7.0/manual/interpreters.html],
> [sqlline|https://github.com/julianhyde/sqlline]. We should look into how
> feasible it is to implement a JDBC interface for Beam SQL
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)