[
https://issues.apache.org/jira/browse/BEAM-4642?focusedWorklogId=116978&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116978
]
ASF GitHub Bot logged work on BEAM-4642:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 19:43
Start Date: 28/Jun/18 19:43
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5759: [BEAM-4642]
Pipeline options in JDBC URI; default userAgent=BeamSQL
URL: https://github.com/apache/beam/pull/5759
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 448a0d9ece7..8f5d429f4e2 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -22,22 +22,42 @@
import com.google.auto.service.AutoService;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.avatica.ConnectionProperty;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.schema.SchemaPlus;
-/** Calcite JDBC driver with Beam defaults. */
+/**
+ * Calcite JDBC driver with Beam defaults.
+ *
+ * <p>Connection URLs have this form:
+ *
+ * <p><code>jdbc:beam:param1=value1;param2=value2;param3=value3</code>
+ *
+ * <p>The querystring-style parameters are parsed as {@link PipelineOptions}.
+ */
@AutoService(java.sql.Driver.class)
public class JdbcDriver extends Driver {
public static final JdbcDriver INSTANCE = new JdbcDriver();
public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
+
+ /**
+ * Querystring parameters that begin with {@code "beam."} will be
interpreted as {@link
+ * PipelineOptions}.
+ */
+ public static final String BEAM_QUERYSTRING_PREFIX = "beam.";
+
private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
static {
@@ -87,6 +107,22 @@ public Connection connect(String url, Properties info)
throws SQLException {
// Beam schema may change without notifying Calcite
defaultSchema.setCacheEnabled(false);
+
+ // Set default PipelineOptions to which we apply the querystring
+ Map<String, String> pipelineOptionsMap =
+ ((BeamCalciteSchema)
CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
+ ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
+ pipelineOptionsMap.put("userAgent", String.format("BeamSQL/%s",
releaseInfo.getVersion()));
+
+ String querystring = url.substring(CONNECT_STRING_PREFIX.length());
+ for (Map.Entry<Object, Object> propertyValue :
+ ConnectStringParser.parse(querystring).entrySet()) {
+ String name = (String) propertyValue.getKey();
+ if (name.startsWith(BEAM_QUERYSTRING_PREFIX)) {
+ pipelineOptionsMap.put(
+ name.substring(BEAM_QUERYSTRING_PREFIX.length()), (String)
propertyValue.getValue());
+ }
+ }
return connection;
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 508f1a11c14..7cb5eb64f0f 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -19,6 +19,9 @@
import static org.apache.beam.sdk.values.Row.toRow;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -33,6 +36,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.parser.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
@@ -41,6 +45,8 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Before;
@@ -95,6 +101,46 @@ public void testDriverManager_simple() throws Exception {
assertTrue(statement.execute("SELECT 1"));
}
+ /** Tests that the userAgent is set in the pipeline options of the
connection. */
+ @Test
+ public void testDriverManager_defaultUserAgent() throws Exception {
+ Connection connection =
DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+ SchemaPlus rootSchema = ((CalciteConnection) connection).getRootSchema();
+ BeamCalciteSchema beamSchema =
+ (BeamCalciteSchema)
CalciteSchema.from(rootSchema.getSubSchema("beam")).schema;
+ Map<String, String> pipelineOptions = beamSchema.getPipelineOptions();
+ assertThat(pipelineOptions.get("userAgent"), containsString("BeamSQL"));
+ }
+
+ /** Tests that userAgent can be overridden on the querystring. */
+ @Test
+ public void testDriverManager_setUserAgent() throws Exception {
+ Connection connection =
+ DriverManager.getConnection(
+ JdbcDriver.CONNECT_STRING_PREFIX + "beam.userAgent=Secret Agent");
+ SchemaPlus rootSchema = ((CalciteConnection) connection).getRootSchema();
+ BeamCalciteSchema beamSchema =
+ (BeamCalciteSchema)
CalciteSchema.from(rootSchema.getSubSchema("beam")).schema;
+ Map<String, String> pipelineOptions = beamSchema.getPipelineOptions();
+ assertThat(pipelineOptions.get("userAgent"), equalTo("Secret Agent"));
+ }
+
+ /** Tests that unknown pipeline options are passed verbatim from the JDBC
URI. */
+ @Test
+ public void testDriverManager_pipelineOptionsPlumbing() throws Exception {
+ Connection connection =
+ DriverManager.getConnection(
+ JdbcDriver.CONNECT_STRING_PREFIX
+ + "beam.foo=baz;beam.foobizzle=mahshizzle;other=smother");
+ SchemaPlus rootSchema = ((CalciteConnection) connection).getRootSchema();
+ BeamCalciteSchema beamSchema =
+ (BeamCalciteSchema)
CalciteSchema.from(rootSchema.getSubSchema("beam")).schema;
+ Map<String, String> pipelineOptions = beamSchema.getPipelineOptions();
+ assertThat(pipelineOptions.get("foo"), equalTo("baz"));
+ assertThat(pipelineOptions.get("foobizzle"), equalTo("mahshizzle"));
+ assertThat(pipelineOptions.get("other"), nullValue());
+ }
+
@Test
public void testDriverManager_parse() throws Exception {
Connection connection =
DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 116978)
Time Spent: 1h 40m (was: 1.5h)
> Allow setting PipelineOptions for JDBC connections
> --------------------------------------------------
>
> Key: BEAM-4642
> URL: https://issues.apache.org/jira/browse/BEAM-4642
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Currently you can set pipeline options only through {{SET}} commands. It
> would be convenient to set defaults in the JDBC connection string.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)