[
https://issues.apache.org/jira/browse/BEAM-3773?focusedWorklogId=113394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113394
]
ASF GitHub Bot logged work on BEAM-3773:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jun/18 21:38
Start Date: 19/Jun/18 21:38
Worklog Time Spent: 10m
Work Description: XuMingmin closed pull request #5592: [BEAM-3773] [SQL]
Add support for setting PipelineOptions
URL: https://github.com/apache/beam/pull/5592
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
index 264c78cb569..e2ba55af220 100644
--- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp
+++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp
@@ -26,6 +26,7 @@ data: {
"org.apache.calcite.sql.SqlDrop"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateTable"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
+ "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
"org.apache.beam.sdk.schemas.Schema"
"org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
"org.apache.calcite.sql.type.SqlTypeName"
@@ -49,6 +50,7 @@ data: {
# List of methods for parsing custom SQL statements.
statementParserMethods: [
+ "SqlSetOptionBeam(Span.of(), null)"
]
# List of methods for parsing custom literals.
@@ -63,6 +65,7 @@ data: {
# List of methods for parsing extensions to "ALTER <scope>" calls.
# Each must accept arguments "(SqlParserPos pos, String scope)".
alterStatementParserMethods: [
+ "SqlSetOptionBeam"
]
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
index d7bdb04b0b1..57122187594 100644
--- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
+++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
@@ -285,4 +285,48 @@ Schema.FieldType SimpleType() :
}
}
+SqlSetOptionBeam SqlSetOptionBeam(Span s, String scope) :
+{
+ SqlIdentifier name;
+ final SqlNode val;
+}
+{
+ (
+ <SET> {
+ s.add(this);
+ }
+ name = CompoundIdentifier()
+ <EQ>
+ (
+ val = Literal()
+ |
+ val = SimpleIdentifier()
+ |
+ <ON> {
+ // OFF is handled by SimpleIdentifier, ON handled here.
+ val = new SqlIdentifier(token.image.toUpperCase(Locale.ROOT),
+ getPos());
+ }
+ )
+ {
+ return new SqlSetOptionBeam(s.end(val), scope, name, val);
+ }
+ |
+ <RESET> {
+ s.add(this);
+ }
+ (
+ name = CompoundIdentifier()
+ |
+ <ALL> {
+ name = new SqlIdentifier(token.image.toUpperCase(Locale.ROOT),
+ getPos());
+ }
+ )
+ {
+ return new SqlSetOptionBeam(s.end(name), scope, name, null);
+ }
+ )
+}
+
// End parserImpls.ftl
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 67366435731..bb06d72a33c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -21,9 +21,9 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollectionTuple;
/** {@link BeamSqlCli} provides methods to execute Beam SQL with an
interactive client. */
@@ -56,9 +56,7 @@ public void execute(String sqlString) throws ParseException {
env.executeDdl(sqlString);
} else {
PipelineOptions options =
- PipelineOptionsFactory.fromArgs(new String[] {})
- .withValidation()
- .as(PipelineOptions.class);
+
BeamEnumerableConverter.createPipelineOptions(env.getPipelineOptions());
options.setJobName("BeamPlanCreator");
Pipeline pipeline = Pipeline.create(options);
PCollectionTuple.empty(pipeline).apply(env.parseQuery(sqlString));
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
index 7e93469dd61..5fe4a05b69b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -31,16 +33,22 @@
/** Adapter from {@link TableProvider} to {@link Schema}. */
public class BeamCalciteSchema implements Schema {
- private TableProvider tableProvider;
+ private final TableProvider tableProvider;
+ private final Map<String, String> pipelineOptions;
public BeamCalciteSchema(TableProvider tableProvider) {
this.tableProvider = tableProvider;
+ this.pipelineOptions = Maps.newHashMap();
}
public TableProvider getTableProvider() {
return tableProvider;
}
+ public Map<String, String> getPipelineOptions() {
+ return pipelineOptions;
+ }
+
@Override
public boolean isMutable() {
return true;
@@ -67,7 +75,7 @@ public Expression getExpression(SchemaPlus parentSchema,
String name) {
if (table == null) {
return null;
}
- return new BeamCalciteTable(tableProvider.buildBeamSqlTable(table));
+ return new BeamCalciteTable(tableProvider.buildBeamSqlTable(table),
getPipelineOptions());
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index d20d5cd5faa..8007d1eae92 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
@@ -42,10 +43,12 @@
class BeamCalciteTable extends AbstractQueryableTable
implements ModifiableTable, TranslatableTable {
private final BeamSqlTable beamTable;
+ private final Map<String, String> pipelineOptions;
- public BeamCalciteTable(BeamSqlTable beamTable) {
+ public BeamCalciteTable(BeamSqlTable beamTable, Map<String, String>
pipelineOptions) {
super(Object[].class);
this.beamTable = beamTable;
+ this.pipelineOptions = pipelineOptions;
}
@Override
@@ -55,7 +58,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory)
{
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable
relOptTable) {
- return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable);
+ return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable,
pipelineOptions);
}
@Override
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 194e7503363..bd3278abf33 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
@@ -132,6 +133,10 @@ public void executeDdl(String sqlStatement) throws
ParseException {
return connection.createPrepareContext();
}
+ public Map<String, String> getPipelineOptions() {
+ return ((BeamCalciteSchema)
CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
+ }
+
public String explain(String sqlString) throws ParseException {
try {
return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
index c8496ff9767..a41579a2065 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateTable.java
@@ -23,7 +23,6 @@
import com.alibaba.fastjson.JSONObject;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
@@ -34,14 +33,12 @@
import org.apache.calcite.sql.SqlExecutableStatement;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Pair;
/** Parse tree for {@code CREATE TABLE} statement. */
@@ -149,25 +146,17 @@ private void unparseColumn(SqlWriter writer, Schema.Field
column) {
}
}
- private @Nullable String getString(SqlNode n) {
- if (n == null) {
- return null;
- }
- if (n instanceof SqlIdentifier) {
- return ((SqlIdentifier) n).toString();
- }
- return ((NlsString) SqlLiteral.value(n)).getValue();
- }
-
private Table toTable() {
return Table.builder()
- .type(getString(type))
+ .type(SqlDdlNodes.getString(type))
.name(name.getSimple())
.schema(columnList.stream().collect(toSchema()))
- .comment(getString(comment))
- .location(getString(location))
+ .comment(SqlDdlNodes.getString(comment))
+ .location(SqlDdlNodes.getString(location))
.properties(
- (tblProperties == null) ? new JSONObject() :
parseObject(getString(tblProperties)))
+ (tblProperties == null)
+ ? new JSONObject()
+ : parseObject(SqlDdlNodes.getString(tblProperties)))
.build();
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
index b4f1ba41cc4..dd816a8b1be 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java
@@ -17,12 +17,15 @@
package org.apache.beam.sdk.extensions.sql.impl.parser;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
@@ -59,6 +62,16 @@ public static SqlNode column(
}
return Pair.of(schema, name);
}
+
+ static @Nullable String getString(SqlNode n) {
+ if (n == null) {
+ return null;
+ }
+ if (n instanceof SqlIdentifier) {
+ return ((SqlIdentifier) n).toString();
+ }
+ return ((NlsString) SqlLiteral.value(n)).getValue();
+ }
}
// End SqlDdlNodes.java
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
new file mode 100644
index 00000000000..fdc5f04feee
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.sql.SqlExecutableStatement;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSetOption;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.Pair;
+
+/** SQL parse tree node to represent {@code SET} and {@code RESET} statements.
*/
+public class SqlSetOptionBeam extends SqlSetOption implements
SqlExecutableStatement {
+
+ public SqlSetOptionBeam(SqlParserPos pos, String scope, SqlIdentifier name,
SqlNode value) {
+ super(pos, scope, name, value);
+ }
+
+ @Override
+ public void execute(CalcitePrepare.Context context) {
+ final SqlIdentifier name = getName();
+ final SqlNode value = getValue();
+ final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true,
name);
+ if (!(pair.left.schema instanceof BeamCalciteSchema)) {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
+ }
+ BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
+ Map<String, String> options = schema.getPipelineOptions();
+ if (options == null) {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal("PipelineOptions not accessible via
BeamCalciteSchema"));
+ }
+ if (value == null) {
+ if ("ALL".equals(pair.right)) {
+ options.clear();
+ } else {
+ options.remove(pair.right);
+ }
+ } else {
+ options.put(pair.right, SqlDdlNodes.getString(value));
+ }
+ }
+}
+
+// End SqlDropObject.java
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 71bf85667b8..32e4564d408 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -32,6 +32,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
@@ -60,8 +61,6 @@
/** BeamRelNode to replace a {@code Enumerable} node. */
public class BeamEnumerableConverter extends ConverterImpl implements
EnumerableRel {
- private final PipelineOptions options = PipelineOptionsFactory.create();
-
public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits,
RelNode input) {
super(cluster, ConventionTraitDef.INSTANCE, traits, input);
}
@@ -83,12 +82,27 @@ public Result implement(EnumerableRelImplementor
implementor, Prefer prefer) {
final RelDataType rowType = getRowType();
final PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), rowType,
prefer.preferArray());
- final Expression options = implementor.stash(this.options,
PipelineOptions.class);
final Expression node = implementor.stash((BeamRelNode) getInput(),
BeamRelNode.class);
- list.add(Expressions.call(BeamEnumerableConverter.class, "toEnumerable",
options, node));
+ list.add(Expressions.call(BeamEnumerableConverter.class, "toEnumerable",
node));
return implementor.result(physType, list.toBlock());
}
+ public static Enumerable<Object> toEnumerable(BeamRelNode node) {
+ final PipelineOptions options =
createPipelineOptions(node.getPipelineOptions());
+ return toEnumerable(options, node);
+ }
+
+ public static PipelineOptions createPipelineOptions(Map<String, String> map)
{
+ final String[] args = new String[map.size()];
+ int i = 0;
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ args[i++] = "--" + entry.getKey() + "=" + entry.getValue();
+ }
+ PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
+ options.as(ApplicationNameOptions.class).setAppName("BeamSql");
+ return options;
+ }
+
public static Enumerable<Object> toEnumerable(PipelineOptions options,
BeamRelNode node) {
final ClassLoader originalClassLoader =
Thread.currentThread().getContextClassLoader();
try {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index df457f387a5..3a478a8e80e 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
+import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -29,11 +30,17 @@
/** BeamRelNode to replace a {@code TableScan} node. */
public class BeamIOSourceRel extends TableScan implements BeamRelNode {
- private BeamSqlTable sqlTable;
+ private final BeamSqlTable sqlTable;
+ private final Map<String, String> pipelineOptions;
- public BeamIOSourceRel(RelOptCluster cluster, RelOptTable table,
BeamSqlTable sqlTable) {
+ public BeamIOSourceRel(
+ RelOptCluster cluster,
+ RelOptTable table,
+ BeamSqlTable sqlTable,
+ Map<String, String> pipelineOptions) {
super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table);
this.sqlTable = sqlTable;
+ this.pipelineOptions = pipelineOptions;
}
@Override
@@ -52,4 +59,9 @@ public BeamIOSourceRel(RelOptCluster cluster, RelOptTable
table, BeamSqlTable sq
protected BeamSqlTable getBeamSqlTable() {
return sqlTable;
}
+
+ @Override
+ public Map<String, String> getPipelineOptions() {
+ return pipelineOptions;
+ }
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 1cdde3315da..0fd2e9afb14 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.rel;
+import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -31,4 +32,16 @@
* DFS(Depth-First-Search) algorithm.
*/
PTransform<PCollectionTuple, PCollection<Row>> toPTransform();
+
+ /** Perform a DFS(Depth-First-Search) to find the PipelineOptions config. */
+ default Map<String, String> getPipelineOptions() {
+ Map<String, String> options = null;
+ for (RelNode input : getInputs()) {
+ Map<String, String> inputOptions = ((BeamRelNode)
input).getPipelineOptions();
+ assert inputOptions != null;
+ assert options == null || options == inputOptions;
+ options = inputOptions;
+ }
+ return options;
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index b5d7d5ac5b4..8f458d8e49f 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -41,7 +41,9 @@
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.jdbc.CalciteConnection;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
/** Test for {@link JdbcDriver}. */
public class JdbcDriverTest {
@@ -58,6 +60,18 @@
.addNullableField("nestedRow", Schema.FieldType.row(BASIC_SCHEMA))
.build();
+ private static final ReadOnlyTableProvider BOUNDED_TABLE =
+ new ReadOnlyTableProvider(
+ "test",
+ ImmutableMap.of(
+ "test",
+ MockedBoundedTable.of(
+ Schema.FieldType.INT32, "id",
+ Schema.FieldType.STRING, "name")
+ .addRows(1, "first")));
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
@Before
public void before() throws Exception {
Class.forName("org.apache.beam.sdk.extensions.sql.impl.JdbcDriver");
@@ -199,16 +213,7 @@ public void testInsertIntoCreatedTable() throws Exception {
@Test
public void testInternalConnect_boundedTable() throws Exception {
- ReadOnlyTableProvider tableProvider =
- new ReadOnlyTableProvider(
- "test",
- ImmutableMap.of(
- "test",
- MockedBoundedTable.of(
- Schema.FieldType.INT32, "id",
- Schema.FieldType.STRING, "name")
- .addRows(1, "first")));
- CalciteConnection connection = JdbcDriver.connect(tableProvider);
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
assertTrue(resultSet.next());
@@ -239,4 +244,31 @@ private Row row(Object... values) {
private Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}
+
+ @Test
+ public void testInternalConnect_setDirectRunner() throws Exception {
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ Statement statement = connection.createStatement();
+ assertEquals(0, statement.executeUpdate("SET runner = direct"));
+ assertTrue(statement.execute("SELECT * FROM test"));
+ }
+
+ @Test
+ public void testInternalConnect_setBogusRunner() throws Exception {
+ thrown.expectMessage("Unknown 'runner' specified 'bogus'");
+
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ Statement statement = connection.createStatement();
+ assertEquals(0, statement.executeUpdate("SET runner = bogus"));
+ assertTrue(statement.execute("SELECT * FROM test"));
+ }
+
+ @Test
+ public void testInternalConnect_resetAll() throws Exception {
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ Statement statement = connection.createStatement();
+ assertEquals(0, statement.executeUpdate("SET runner = bogus"));
+ assertEquals(0, statement.executeUpdate("RESET ALL"));
+ assertTrue(statement.execute("SELECT * FROM test"));
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113394)
Time Spent: 17h 10m (was: 17h)
> [SQL] Investigate JDBC interface for Beam SQL
> ---------------------------------------------
>
> Key: BEAM-3773
> URL: https://issues.apache.org/jira/browse/BEAM-3773
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Anton Kedin
> Assignee: Andrew Pilloud
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 17h 10m
> Remaining Estimate: 0h
>
> JDBC allows integration with a lot of third-party tools, e.g
> [Zeppelin|https://zeppelin.apache.org/docs/0.7.0/manual/interpreters.html],
> [sqlline|https://github.com/julianhyde/sqlline]. We should look into how
> feasible it is to implement a JDBC interface for Beam SQL
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)