Repository: incubator-beam
Updated Branches:
  refs/heads/master 3b1c2a3cf -> dde8e35ca


[BEAM-743] JdbcIO deals right getConnection() to use with DBCP BasicDataSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cb62002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cb62002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cb62002

Branch: refs/heads/master
Commit: 1cb6200232e78bb3b6cc0183a0738fd9e6f24e7c
Parents: 3b1c2a3
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Wed Oct 12 14:23:51 2016 +0200
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Wed Oct 19 08:12:31 2016 +0200

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/jdbc/JdbcIO.java     | 70 ++++++++++++--------
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 39 ++++++++++-
 2 files changed, 80 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cb62002/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 3bdbcce..505cdee 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -55,15 +55,20 @@ import org.apache.commons.dbcp2.BasicDataSource;
  * type returned by the provided {@link RowMapper}.
  *
  * <p>To configure the JDBC source, you have to provide a {@link 
DataSourceConfiguration} using
- * {@link DataSourceConfiguration#create} with either a {@link DataSource} 
(which must be
- * {@link Serializable}) or the parameters needed to create it (driver class 
name, url, and
- * optionally username and password). For example:
+ * {@link DataSourceConfiguration#create(DataSource)} or
+ * {@link DataSourceConfiguration#create(String, String)} with either a
+ * {@link DataSource} (which must be {@link Serializable}) or the parameters 
needed to create it
+ * (driver class name and url). Optionally, {@link 
DataSourceConfiguration#withUsername(String)} and
+ * {@link DataSourceConfiguration#withPassword(String)} allows you to define 
DataSource username
+ * and password.
+ * For example:
  *
  * <pre>{@code
  * pipeline.apply(JdbcIO.<KV<Integer, String>>read()
  *   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
- *       "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
- *       "username", "password"))
+ *          "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
+ *        .withUsername("username")
+ *        .withPassword("password"))
  *   .withQuery("select id,name from Person")
  *   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
  *     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception 
{
@@ -85,8 +90,9 @@ import org.apache.commons.dbcp2.BasicDataSource;
  *   .apply(...)
  *   .apply(JdbcIO.<KV<Integer, String>>write()
  *      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
- *         "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
- *         "username", "password"))
+ *            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
+ *          .withUsername("username")
+ *          .withPassword("password"))
  *      .withStatement("insert into Person values(?, ?)")
  *      .withPreparedStatementSetter(new 
JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
  *        public void setParameters(KV<Integer, String> element, 
PreparedStatement query) {
@@ -143,29 +149,41 @@ public class JdbcIO {
     @Nullable abstract String getPassword();
     @Nullable abstract DataSource getDataSource();
 
-    /** Configuration using a {@link Serializable} {@link DataSource}. */
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setDriverClassName(String driverClassName);
+      abstract Builder setUrl(String url);
+      abstract Builder setUsername(String username);
+      abstract Builder setPassword(String password);
+      abstract Builder setDataSource(DataSource dataSource);
+      abstract DataSourceConfiguration build();
+    }
+
     public static DataSourceConfiguration create(DataSource dataSource) {
       checkNotNull(dataSource, "dataSource");
       checkArgument(dataSource instanceof Serializable, "dataSource must be 
Serializable");
-      return new AutoValue_JdbcIO_DataSourceConfiguration(null, null, null, 
null, dataSource);
+      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
+          .setDataSource(dataSource)
+          .build();
     }
 
-    /** Configuration using the given driver, url, username and password. */
-    public static DataSourceConfiguration create(
-        String driverClassName, String url, String username, String password) {
+    public static DataSourceConfiguration create(String driverClassName, 
String url) {
       checkNotNull(driverClassName, "driverClassName");
       checkNotNull(url, "url");
-      checkNotNull(username, "username");
-      checkNotNull(password, "password");
-      return new AutoValue_JdbcIO_DataSourceConfiguration(
-          driverClassName, url, username, password, null);
+      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
+          .setDriverClassName(driverClassName)
+          .setUrl(url)
+          .build();
     }
 
-    /** Configuration using the given driver and url, without a username and 
password. */
-    public static DataSourceConfiguration create(String driverClassName, 
String url) {
-      checkNotNull(driverClassName, "driverClassName");
-      checkNotNull(url, "url");
-      return new AutoValue_JdbcIO_DataSourceConfiguration(driverClassName, 
url, null, null, null);
+    public DataSourceConfiguration withUsername(String username) {
+      return builder().setUsername(username).build();
+    }
+
+    public DataSourceConfiguration withPassword(String password) {
+      return builder().setPassword(password).build();
     }
 
     private void populateDisplayData(DisplayData.Builder builder) {
@@ -179,20 +197,18 @@ public class JdbcIO {
     }
 
     Connection getConnection() throws Exception {
-      DataSource dataSource;
       if (getDataSource() != null) {
-        dataSource = getDataSource();
+        return (getUsername() != null)
+            ? getDataSource().getConnection(getUsername(), getPassword())
+            : getDataSource().getConnection();
       } else {
         BasicDataSource basicDataSource = new BasicDataSource();
         basicDataSource.setDriverClassName(getDriverClassName());
         basicDataSource.setUrl(getUrl());
         basicDataSource.setUsername(getUsername());
         basicDataSource.setPassword(getPassword());
-        dataSource = basicDataSource;
+        return basicDataSource.getConnection();
       }
-      return (getUsername() == null)
-          ? dataSource.getConnection()
-          : dataSource.getConnection(getUsername(), getPassword());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cb62002/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index b3073a2..860ca0f 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -119,8 +119,7 @@ public class JdbcIOTest implements Serializable {
 
   @Test
   public void testDataSourceConfigurationDataSource() throws Exception {
-    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(
-        dataSource);
+    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(dataSource);
     try (Connection conn = config.getConnection()) {
       assertTrue(conn.isValid(0));
     }
@@ -137,6 +136,42 @@ public class JdbcIOTest implements Serializable {
   }
 
   @Test
+  public void testDataSourceConfigurationUsernameAndPassword() throws 
Exception {
+    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(
+        "org.apache.derby.jdbc.ClientDriver",
+        "jdbc:derby://localhost:1527/target/beam")
+        .withUsername("sa")
+        .withPassword("sa");
+    try (Connection conn = config.getConnection()) {
+      assertTrue(conn.isValid(0));
+    }
+  }
+
+  @Test
+  public void testDataSourceConfigurationNullPassword() throws Exception {
+    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(
+        "org.apache.derby.jdbc.ClientDriver",
+        "jdbc:derby://localhost:1527/target/beam")
+        .withUsername("sa")
+        .withPassword(null);
+    try (Connection conn = config.getConnection()) {
+      assertTrue(conn.isValid(0));
+    }
+  }
+
+  @Test
+  public void testDataSourceConfigurationNullUsernameAndPassword() throws 
Exception {
+    JdbcIO.DataSourceConfiguration config = 
JdbcIO.DataSourceConfiguration.create(
+        "org.apache.derby.jdbc.ClientDriver",
+        "jdbc:derby://localhost:1527/target/beam")
+        .withUsername(null)
+        .withPassword(null);
+    try (Connection conn = config.getConnection()) {
+      assertTrue(conn.isValid(0));
+    }
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testRead() throws Exception {
     TestPipeline pipeline = TestPipeline.create();

Reply via email to