Repository: incubator-beam Updated Branches: refs/heads/master bc80ee342 -> c5c343659
[BEAM-244] Add JDBC IO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19fad184 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19fad184 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19fad184 Branch: refs/heads/master Commit: 19fad184ac0f8521770dff96bdad5bff2ef9aa03 Parents: bc80ee3 Author: Jean-Baptiste Onofré <[email protected]> Authored: Mon Sep 5 12:57:14 2016 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Sun Oct 2 10:35:03 2016 +0200 ---------------------------------------------------------------------- sdks/java/io/jdbc/pom.xml | 138 ++++++ .../org/apache/beam/sdk/io/jdbc/JdbcIO.java | 427 +++++++++++++++++++ .../apache/beam/sdk/io/jdbc/package-info.java | 22 + .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 236 ++++++++++ sdks/java/io/pom.xml | 1 + 5 files changed, 824 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml new file mode 100644 index 0000000..75eb5ed --- /dev/null +++ b/sdks/java/io/jdbc/pom.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.3.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-jdbc</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: JDBC</name> + <description>IO to read and write on JDBC datasource.</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-dbcp2</artifactId> + <version>2.1.1</version> + </dependency> + + <!-- compile dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.12.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derbyclient</artifactId> + <version>10.12.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derbynet</artifactId> + <version>10.12.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java new file mode 100644 index 0000000..f4c3cab --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Random; + +import javax.annotation.Nullable; +import javax.sql.DataSource; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.commons.dbcp2.BasicDataSource; + +/** + * IO to read and write data on JDBC. + * + * <h3>Reading from JDBC datasource</h3> + * + * <p>JdbcIO source returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is the + * type returned by the provided {@link RowMapper}. + * + * <p>To configure the JDBC source, you have to provide a {@link DataSourceConfiguration} using + * {@link DataSourceConfiguration#create} with either a {@link DataSource} (which must be + * {@link Serializable}) or the parameters needed to create it (driver class name, url, and + * optionally username and password). For example: + * + * <pre>{@code + * pipeline.apply(JdbcIO.<KV<Integer, String>>read() + * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( + * "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb", + * "username", "password")) + * .withQuery("select id,name from Person") + * .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { + * public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { + * return KV.of(resultSet.getInt(1), resultSet.getString(2)); + * } + * }) + * }</pre> + * + * <h3>Writing to JDBC datasource</h3> + * + * <p>JDBC sink supports writing records into a database. It writes a {@link PCollection} to the + * database by converting each T into a {@link PreparedStatement} via a user-provided {@link + * PreparedStatementSetter}. + * + * <p>Like the source, to configure the sink, you have to provide a {@link DataSourceConfiguration}. + * + * <pre>{@code + * pipeline + * .apply(...) + * .apply(JdbcIO.<KV<Integer, String>>write() + * .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( + * "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb", + * "username", "password")) + * .withStatement("insert into Person values(?, ?)") + * .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() { + * public void setParameters(KV<Integer, String> element, PreparedStatement query) { + * query.setInt(1, kv.getKey()); + * query.setString(2, kv.getValue()); + * } + * }) + * }</pre> + * + * <p>NB: in case of transient failures, Beam runners may execute parts of JdbcIO.Write multiple + * times for fault tolerance. Because of that, you should avoid using {@code INSERT} statements, + * since that risks duplicating records in the database, or failing due to primary key conflicts. + * Consider using <a href="https://en.wikipedia.org/wiki/Merge_(SQL)">MERGE ("upsert") + * statements</a> supported by your database instead. + */ +public class JdbcIO { + /** + * Read data from a JDBC datasource. + * + * @param <T> Type of the data to be read. + */ + public static <T> Read<T> read() { + return new AutoValue_JdbcIO_Read.Builder<T>().build(); + } + + /** + * Write data to a JDBC datasource. + * + * @param <T> Type of the data to be written. + */ + public static <T> Write<T> write() { + return new AutoValue_JdbcIO_Write.Builder<T>().build(); + } + + private JdbcIO() {} + + /** + * An interface used by {@link JdbcIO.Read} for converting each row of the {@link ResultSet} into + * an element of the resulting {@link PCollection}. + */ + public interface RowMapper<T> extends Serializable { + T mapRow(ResultSet resultSet) throws Exception; + } + + /** + * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all + * properties allowing to create a {@link DataSource}. + */ + @AutoValue + abstract static class DataSourceConfiguration implements Serializable { + @Nullable abstract String getDriverClassName(); + @Nullable abstract String getUrl(); + @Nullable abstract String getUsername(); + @Nullable abstract String getPassword(); + @Nullable abstract DataSource getDataSource(); + + /** Configuration using a {@link Serializable} {@link DataSource}. */ + public static DataSourceConfiguration create(DataSource dataSource) { + checkNotNull(dataSource, "dataSource"); + checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); + return new AutoValue_JdbcIO_DataSourceConfiguration(null, null, null, null, dataSource); + } + + /** Configuration using the given driver, url, username and password. */ + public static DataSourceConfiguration create( + String driverClassName, String url, String username, String password) { + checkNotNull(driverClassName, "driverClassName"); + checkNotNull(url, "url"); + checkNotNull(username, "username"); + checkNotNull(password, "password"); + return new AutoValue_JdbcIO_DataSourceConfiguration( + driverClassName, url, username, password, null); + } + + /** Configuration using the given driver and url, without a username and password. */ + public static DataSourceConfiguration create(String driverClassName, String url) { + checkNotNull(driverClassName, "driverClassName"); + checkNotNull(url, "url"); + return new AutoValue_JdbcIO_DataSourceConfiguration(driverClassName, url, null, null, null); + } + + private void populateDisplayData(DisplayData.Builder builder) { + if (getDataSource() != null) { + builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName())); + } else { + builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName())); + builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl())); + builder.addIfNotNull(DisplayData.item("username", getUsername())); + } + } + + Connection getConnection() throws Exception { + DataSource dataSource; + if (getDataSource() != null) { + dataSource = getDataSource(); + } else { + BasicDataSource basicDataSource = new BasicDataSource(); + basicDataSource.setDriverClassName(getDriverClassName()); + basicDataSource.setUrl(getUrl()); + basicDataSource.setUsername(getUsername()); + basicDataSource.setPassword(getPassword()); + dataSource = basicDataSource; + } + return (getUsername() == null) + ? dataSource.getConnection() + : dataSource.getConnection(getUsername(), getPassword()); + } + } + + /** A {@link PTransform} to read data from a JDBC datasource. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + @Nullable abstract DataSourceConfiguration getDataSourceConfiguration(); + @Nullable abstract String getQuery(); + @Nullable abstract RowMapper<T> getRowMapper(); + @Nullable abstract Coder<T> getCoder(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config); + abstract Builder<T> setQuery(String query); + abstract Builder<T> setRowMapper(RowMapper<T> rowMapper); + abstract Builder<T> setCoder(Coder<T> coder); + abstract Read<T> build(); + } + + public Read<T> withDataSourceConfiguration(DataSourceConfiguration configuration) { + checkNotNull(configuration, "configuration"); + return toBuilder().setDataSourceConfiguration(configuration).build(); + } + + public Read<T> withQuery(String query) { + checkNotNull(query, "query"); + return toBuilder().setQuery(query).build(); + } + + public Read<T> withRowMapper(RowMapper<T> rowMapper) { + checkNotNull(rowMapper, "rowMapper"); + return toBuilder().setRowMapper(rowMapper).build(); + } + + public Read<T> withCoder(Coder<T> coder) { + checkNotNull(coder, "coder"); + return toBuilder().setCoder(coder).build(); + } + + @Override + public PCollection<T> apply(PBegin input) { + return input + .apply(Create.of(getQuery())) + .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder()) + // generate a random key followed by a GroupByKey and then ungroup + // to prevent fusion + // see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion + // for details + .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() { + private Random random; + @Setup + public void setup() { + random = new Random(); + } + @ProcessElement + public void processElement(ProcessContext context) { + context.output(KV.of(random.nextInt(), context.element())); + } + })) + .apply(GroupByKey.<Integer, T>create()) + .apply(Values.<Iterable<T>>create()) + .apply(Flatten.<T>iterables()); + } + + @Override + public void validate(PBegin input) { + checkNotNull(getQuery(), "query"); + checkNotNull(getRowMapper(), "rowMapper"); + checkNotNull(getCoder(), "coder"); + checkNotNull(getDataSourceConfiguration()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("query", getQuery())); + builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName())); + builder.add(DisplayData.item("coder", getCoder().getClass().getName())); + getDataSourceConfiguration().populateDisplayData(builder); + } + + /** A {@link DoFn} executing the SQL query to read from the database. */ + static class ReadFn<T> extends DoFn<String, T> { + private JdbcIO.Read<T> spec; + private Connection connection; + + private ReadFn(Read<T> spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + connection = spec.getDataSourceConfiguration().getConnection(); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + String query = context.element(); + try (PreparedStatement statement = connection.prepareStatement(query)) { + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + context.output(spec.getRowMapper().mapRow(resultSet)); + } + } + } + } + + @Teardown + public void teardown() throws Exception { + if (connection != null) { + connection.close(); + } + } + } + } + + /** + * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement} + * used to setParameters into the database. + */ + public interface PreparedStatementSetter<T> extends Serializable { + void setParameters(T element, PreparedStatement preparedStatement) throws Exception; + } + + /** A {@link PTransform} to write to a JDBC datasource. */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { + @Nullable abstract DataSourceConfiguration getDataSourceConfiguration(); + @Nullable abstract String getStatement(); + @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config); + abstract Builder<T> setStatement(String statement); + abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter); + + abstract Write<T> build(); + } + + public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) { + return toBuilder().setDataSourceConfiguration(config).build(); + } + public Write<T> withStatement(String statement) { + return toBuilder().setStatement(statement).build(); + } + public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) { + return toBuilder().setPreparedStatementSetter(setter).build(); + } + + @Override + public PDone apply(PCollection<T> input) { + input.apply(ParDo.of(new WriteFn<T>(this))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<T> input) { + checkNotNull(getDataSourceConfiguration(), "dataSourceConfiguration"); + checkNotNull(getStatement(), "statement"); + checkNotNull(getPreparedStatementSetter(), "preparedStatementSetter"); + } + + private static class WriteFn<T> extends DoFn<T, Void> { + private static final int DEFAULT_BATCH_SIZE = 1000; + + private final Write<T> spec; + + private Connection connection; + private PreparedStatement preparedStatement; + private int batchCount; + + public WriteFn(Write<T> spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + connection = spec.getDataSourceConfiguration().getConnection(); + connection.setAutoCommit(false); + preparedStatement = connection.prepareStatement(spec.getStatement()); + } + + @StartBundle + public void startBundle(Context context) { + batchCount = 0; + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + T record = context.element(); + + preparedStatement.clearParameters(); + spec.getPreparedStatementSetter().setParameters(record, preparedStatement); + preparedStatement.addBatch(); + + batchCount++; + + if (batchCount >= DEFAULT_BATCH_SIZE) { + finishBundle(context); + } + } + + @FinishBundle + public void finishBundle(Context context) throws Exception { + if (batchCount > 0) { + preparedStatement.executeBatch(); + connection.commit(); + batchCount = 0; + } + } + + @Teardown + public void teardown() throws Exception { + try { + if (preparedStatement != null) { + preparedStatement.close(); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java new file mode 100644 index 0000000..b568800 --- /dev/null +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from JDBC. + */ +package org.apache.beam.sdk.io.jdbc; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java new file mode 100644 index 0000000..b3073a2 --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.Serializable; +import java.net.InetAddress; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; + +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.derby.drda.NetworkServerControl; +import org.apache.derby.jdbc.ClientDataSource; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test on the JdbcIO. + */ +public class JdbcIOTest implements Serializable { + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcIOTest.class); + + private static NetworkServerControl derbyServer; + private static ClientDataSource dataSource; + + @BeforeClass + public static void startDatabase() throws Exception { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), 1527); + derbyServer.start(null); + + dataSource = new ClientDataSource(); + dataSource.setCreateDatabase("create"); + dataSource.setDatabaseName("target/beam"); + dataSource.setServerName("localhost"); + dataSource.setPortNumber(1527); + + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + statement.executeUpdate("create table BEAM(id INT, name VARCHAR(500))"); + } + } + } + + @AfterClass + public static void shutDownDatabase() throws Exception { + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + statement.executeUpdate("drop table BEAM"); + } + } finally { + if (derbyServer != null) { + derbyServer.shutdown(); + } + } + } + + @Before + public void initTable() throws Exception { + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + statement.executeUpdate("delete from BEAM"); + } + + String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", + "Newton", "Bohr", "Galilei", "Maxwell"}; + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = + connection.prepareStatement("insert into BEAM " + "values (?,?)")) { + for (int i = 0; i < 1000; i++) { + int index = i % scientists.length; + preparedStatement.clearParameters(); + preparedStatement.setInt(1, i); + preparedStatement.setString(2, scientists[index]); + preparedStatement.executeUpdate(); + } + } + + connection.commit(); + } + } + + @Test + public void testDataSourceConfigurationDataSource() throws Exception { + JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create( + dataSource); + try (Connection conn = config.getConnection()) { + assertTrue(conn.isValid(0)); + } + } + + @Test + public void testDataSourceConfigurationDriverAndUrl() throws Exception { + JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create( + "org.apache.derby.jdbc.ClientDriver", + "jdbc:derby://localhost:1527/target/beam"); + try (Connection conn = config.getConnection()) { + assertTrue(conn.isValid(0)); + } + } + + @Test + @Category(NeedsRunner.class) + public void testRead() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + PCollection<KV<String, Integer>> output = pipeline.apply( + JdbcIO.<KV<String, Integer>>read() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) + .withQuery("select name,id from BEAM") + .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() { + @Override + public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception { + KV<String, Integer> kv = + KV.of(resultSet.getString("name"), resultSet.getInt("id")); + return kv; + } + }) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + PAssert.thatSingleton( + output.apply("Count All", Count.<KV<String, Integer>>globally())) + .isEqualTo(1000L); + + PAssert.that(output + .apply("Count Scientist", Count.<String, Integer>perKey()) + ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(element.getKey(), 100L, element.getValue().longValue()); + } + return null; + } + }); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testWrite() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + ArrayList<KV<Integer, String>> data = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + KV<Integer, String> kv = KV.of(i, "Test"); + data.add(kv); + } + pipeline.apply(Create.of(data)) + .apply(JdbcIO.<KV<Integer, String>>write() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( + "org.apache.derby.jdbc.ClientDriver", + "jdbc:derby://localhost:1527/target/beam")) + .withStatement("insert into BEAM values(?, ?)") + .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() { + public void setParameters(KV<Integer, String> element, PreparedStatement statement) + throws Exception { + statement.setInt(1, element.getKey()); + statement.setString(2, element.getValue()); + } + })); + + pipeline.run(); + + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + try (ResultSet resultSet = statement.executeQuery("select count(*) from BEAM")) { + resultSet.next(); + int count = resultSet.getInt(1); + + Assert.assertEquals(2000, count); + } + } + } + } + + @Test + @Category(NeedsRunner.class) + public void testWriteWithEmptyPCollection() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + pipeline.apply(Create.of(new ArrayList<KV<Integer, String>>())) + .apply(JdbcIO.<KV<Integer, String>>write() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( + "org.apache.derby.jdbc.ClientDriver", + "jdbc:derby://localhost:1527/target/beam")) + .withStatement("insert into BEAM values(?, ?)") + .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() { + public void setParameters(KV<Integer, String> element, PreparedStatement statement) + throws Exception { + statement.setInt(1, element.getKey()); + statement.setString(2, element.getValue()); + } + })); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19fad184/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index c4c32ed..82cf8d0 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -39,6 +39,7 @@ <module>kafka</module> <module>kinesis</module> <module>mongodb</module> + <module>jdbc</module> </modules> </project>
