Repository: beam Updated Branches: refs/heads/master 5f972e8b2 -> 5fd2c6e13
JdbcIOIT now uses writeThenRead style Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6201ed1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6201ed1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6201ed1 Branch: refs/heads/master Commit: a6201ed1488d9ae95637002744bc316f72401e56 Parents: 5f972e8 Author: Stephen Sisk <s...@google.com> Authored: Fri Jun 16 11:04:07 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Jul 13 13:43:27 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/common/pom.xml | 10 + .../org/apache/beam/sdk/io/common/TestRow.java | 114 +++++++++++ sdks/java/io/jdbc/pom.xml | 10 +- .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 203 ++++++++++--------- .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 115 ++++++----- .../beam/sdk/io/jdbc/JdbcTestDataSet.java | 130 ------------ .../apache/beam/sdk/io/jdbc/JdbcTestHelper.java | 81 ++++++++ 7 files changed, 377 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml index df0d94b..1a6f54b 100644 --- a/sdks/java/io/common/pom.xml +++ b/sdks/java/io/common/pom.xml @@ -38,5 +38,15 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java new file mode 100644 index 0000000..5f0a2fb --- /dev/null +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.common; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Used to pass values around within test pipelines. + */ +@AutoValue +public abstract class TestRow implements Serializable, Comparable<TestRow> { + /** + * Manually create a test row. + */ + public static TestRow create(Integer id, String name) { + return new AutoValue_TestRow(id, name); + } + + public abstract Integer id(); + public abstract String name(); + + public int compareTo(TestRow other) { + return id().compareTo(other.id()); + } + + /** + * Creates a {@link org.apache.beam.sdk.io.common.TestRow} from the seed value. + */ + public static TestRow fromSeed(Integer seed) { + return create(seed, getNameForSeed(seed)); + } + + /** + * Returns the name field value produced from the given seed. + */ + public static String getNameForSeed(Integer seed) { + return "Testval" + seed; + } + + /** + * Returns a range of {@link org.apache.beam.sdk.io.common.TestRow}s for seed values between + * rangeStart (inclusive) and rangeEnd (exclusive). + */ + public static Iterable<TestRow> getExpectedValues(int rangeStart, int rangeEnd) { + List<TestRow> ret = new ArrayList<TestRow>(rangeEnd - rangeStart + 1); + for (int i = rangeStart; i < rangeEnd; i++) { + ret.add(fromSeed(i)); + } + return ret; + } + + /** + * Uses the input Long values as seeds to produce {@link org.apache.beam.sdk.io.common.TestRow}s. + */ + public static class DeterministicallyConstructTestRowFn extends DoFn<Long, TestRow> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(fromSeed(c.element().intValue())); + } + } + + /** + * Outputs just the name stored in the {@link org.apache.beam.sdk.io.common.TestRow}. + */ + public static class SelectNameFn extends DoFn<TestRow, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().name()); + } + } + + /** + * Precalculated hashes - you can calculate an entry by running HashingFn on + * the name() for the rows generated from seeds in [0, n). + */ + private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of( + 1000, "7d94d63a41164be058a9680002914358" + ); + + /** + * Returns the hash value that {@link org.apache.beam.sdk.io.common.HashingFn} will return when it + * is run on {@link org.apache.beam.sdk.io.common.TestRow}s produced by + * getExpectedValues(0, rowCount). + */ + public static String getExpectedHashForRowCount(int rowCount) + throws UnsupportedOperationException { + String hash = EXPECTED_HASHES.get(rowCount); + if (hash == null) { + throw new UnsupportedOperationException("No hash for that row count"); + } + return hash; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml index 050fc6a..e5f4d7e 100644 --- a/sdks/java/io/jdbc/pom.xml +++ b/sdks/java/io/jdbc/pom.xml @@ -105,11 +105,6 @@ <version>2.1.1</version> </dependency> - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - <!-- compile dependencies --> <dependency> <groupId>com.google.auto.value</groupId> @@ -168,5 +163,10 @@ <scope>test</scope> <classifier>tests</classifier> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java index e8ffad6..32d6d9e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java @@ -17,41 +17,39 @@ */ package org.apache.beam.sdk.io.jdbc; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; +import java.text.ParseException; import java.util.List; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; + +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.values.PCollection; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.postgresql.ds.PGSimpleDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance. * - * <p>This test requires a running instance of Postgres, and the test dataset must exist in the - * database. `JdbcTestDataSet` will create the read table. - * - * <p>You can run this test by doing the following: + * <p>This test requires a running instance of Postgres. Pass in connection information using + * PipelineOptions: * <pre> * mvn -e -Pio-it verify -pl sdks/java/io/jdbc -DintegrationTestPipelineOptions='[ * "--postgresServerName=1.2.3.4", @@ -67,112 +65,123 @@ import org.postgresql.ds.PGSimpleDataSource; */ @RunWith(JUnit4.class) public class JdbcIOIT { + private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class); + public static final int EXPECTED_ROW_COUNT = 1000; private static PGSimpleDataSource dataSource; - private static String writeTableName; + private static String tableName; + + @Rule + public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule + public TestPipeline pipelineRead = TestPipeline.create(); @BeforeClass - public static void setup() throws SQLException { + public static void setup() throws SQLException, ParseException { PipelineOptionsFactory.register(IOTestPipelineOptions.class); IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() .as(IOTestPipelineOptions.class); - // We do dataSource set up in BeforeClass rather than Before since we don't need to create a new - // dataSource for each test. - dataSource = JdbcTestDataSet.getDataSource(options); - } + dataSource = getDataSource(options); - @AfterClass - public static void tearDown() throws SQLException { - // Only do write table clean up once for the class since we don't want to clean up after both - // read and write tests, only want to do it once after all the tests are done. - JdbcTestDataSet.cleanUpDataTable(dataSource, writeTableName); + tableName = JdbcTestHelper.getTableName("IT"); + JdbcTestHelper.createDataTable(dataSource, tableName); } - private static class CreateKVOfNameAndId implements JdbcIO.RowMapper<KV<String, Integer>> { - @Override - public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception { - KV<String, Integer> kv = - KV.of(resultSet.getString("name"), resultSet.getInt("id")); - return kv; - } - } + private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) + throws SQLException { + PGSimpleDataSource dataSource = new PGSimpleDataSource(); - private static class PutKeyInColumnOnePutValueInColumnTwo - implements JdbcIO.PreparedStatementSetter<KV<Integer, String>> { - @Override - public void setParameters(KV<Integer, String> element, PreparedStatement statement) - throws SQLException { - statement.setInt(1, element.getKey()); - statement.setString(2, element.getValue()); - } + dataSource.setDatabaseName(options.getPostgresDatabaseName()); + dataSource.setServerName(options.getPostgresServerName()); + dataSource.setPortNumber(options.getPostgresPort()); + dataSource.setUser(options.getPostgresUsername()); + dataSource.setPassword(options.getPostgresPassword()); + dataSource.setSsl(options.getPostgresSsl()); + + return dataSource; } - @Rule - public TestPipeline pipeline = TestPipeline.create(); + @AfterClass + public static void tearDown() throws SQLException { + JdbcTestHelper.cleanUpDataTable(dataSource, tableName); + } /** - * Does a test read of a few rows from a postgres database. - * - * <p>Note that IT read tests must not do any data table manipulation (setup/clean up.) - * @throws SQLException + * Tests writing then reading data for a postgres database. */ @Test - public void testRead() throws SQLException { - String writeTableName = JdbcTestDataSet.READ_TABLE_NAME; + public void testWriteThenRead() { + runWrite(); + runRead(); + } - PCollection<KV<String, Integer>> output = pipeline.apply(JdbcIO.<KV<String, Integer>>read() + /** + * Writes the test dataset to postgres. + * + * <p>This method does not attempt to validate the data - we do so in the read test. This does + * make it harder to tell whether a test failed in the write or read phase, but the tests are much + * easier to maintain (don't need any separate code to write test data for read tests to + * the database.) + */ + private void runWrite() { + pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT)) + .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) + .apply(JdbcIO.<TestRow>write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) - .withQuery("select name,id from " + writeTableName) - .withRowMapper(new CreateKVOfNameAndId()) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - // TODO: validate actual contents of rows, not just count. - PAssert.thatSingleton( - output.apply("Count All", Count.<KV<String, Integer>>globally())) - .isEqualTo(1000L); + .withStatement(String.format("insert into %s values(?, ?)", tableName)) + .withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow())); - List<KV<String, Long>> expectedCounts = new ArrayList<>(); - for (String scientist : JdbcTestDataSet.SCIENTISTS) { - expectedCounts.add(KV.of(scientist, 100L)); - } - PAssert.that(output.apply("Count Scientist", Count.<String, Integer>perKey())) - .containsInAnyOrder(expectedCounts); - - pipeline.run().waitUntilFinish(); + pipelineWrite.run().waitUntilFinish(); } /** - * Tests writes to a postgres database. + * Read the test dataset from postgres and validate its contents. + * + * <p>When doing the validation, we wish to ensure that we: + * 1. Ensure *all* the rows are correct + * 2. Provide enough information in assertions such that it is easy to spot obvious errors (e.g. + * all elements have a similar mistake, or "only 5 elements were generated" and the user wants + * to see what the problem was. * - * <p>Write Tests must clean up their data - in this case, it uses a new table every test run so - * that it won't interfere with read tests/other write tests. It uses finally to attempt to - * clean up data at the end of the test run. - * @throws SQLException + * <p>We do not wish to generate and compare all of the expected values, so this method uses + * hashing to ensure that all expected data is present. However, hashing does not provide easy + * debugging information (failures like "every element was empty string" are hard to see), + * so we also: + * 1. Generate expected values for the first and last 500 rows + * 2. Use containsInAnyOrder to verify that their values are correct. + * Where first/last 500 rows is determined by the fact that we know all rows have a unique id - we + * can use the natural ordering of that key. */ - @Test - public void testWrite() throws SQLException { - writeTableName = JdbcTestDataSet.createWriteDataTable(dataSource); - - ArrayList<KV<Integer, String>> data = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - KV<Integer, String> kv = KV.of(i, "Test"); - data.add(kv); - } - pipeline.apply(Create.of(data)) - .apply(JdbcIO.<KV<Integer, String>>write() - .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) - .withStatement(String.format("insert into %s values(?, ?)", writeTableName)) - .withPreparedStatementSetter(new PutKeyInColumnOnePutValueInColumnTwo())); - - pipeline.run().waitUntilFinish(); - - try (Connection connection = dataSource.getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery("select count(*) from " + writeTableName)) { - resultSet.next(); - int count = resultSet.getInt(1); - Assert.assertEquals(2000, count); - } - // TODO: Actually verify contents of the rows. + private void runRead() { + PCollection<TestRow> namesAndIds = + pipelineRead.apply(JdbcIO.<TestRow>read() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) + .withQuery(String.format("select name,id from %s;", tableName)) + .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId()) + .withCoder(SerializableCoder.of(TestRow.class))); + + PAssert.thatSingleton( + namesAndIds.apply("Count All", Count.<TestRow>globally())) + .isEqualTo((long) EXPECTED_ROW_COUNT); + + PCollection<String> consolidatedHashcode = namesAndIds + .apply(ParDo.of(new TestRow.SelectNameFn())) + .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode) + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT)); + + PCollection<List<TestRow>> frontOfList = + namesAndIds.apply(Top.<TestRow>smallest(500)); + Iterable<TestRow> expectedFrontOfList = TestRow.getExpectedValues(0, 500); + PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList); + + PCollection<List<TestRow>> backOfList = + namesAndIds.apply(Top.<TestRow>largest(500)); + Iterable<TestRow> expectedBackOfList = + TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500, + EXPECTED_ROW_COUNT); + PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList); + + pipelineRead.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 984ce1a..4ea18ef 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.jdbc; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.PrintWriter; @@ -28,18 +27,22 @@ import java.net.ServerSocket; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import java.util.Collections; +import javax.sql.DataSource; + import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.common.TestRow; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.derby.drda.NetworkServerControl; @@ -58,11 +61,13 @@ import org.slf4j.LoggerFactory; */ public class JdbcIOTest implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class); + public static final int EXPECTED_ROW_COUNT = 1000; private static NetworkServerControl derbyServer; private static ClientDataSource dataSource; private static int port; + private static String readTableName; @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -108,14 +113,16 @@ public class JdbcIOTest implements Serializable { dataSource.setServerName("localhost"); dataSource.setPortNumber(port); + readTableName = JdbcTestHelper.getTableName("UT_READ"); - JdbcTestDataSet.createReadDataTable(dataSource); + JdbcTestHelper.createDataTable(dataSource, readTableName); + addInitialData(dataSource, readTableName); } @AfterClass public static void shutDownDatabase() throws Exception { try { - JdbcTestDataSet.cleanUpDataTable(dataSource, JdbcTestDataSet.READ_TABLE_NAME); + JdbcTestHelper.cleanUpDataTable(dataSource, readTableName); } finally { if (derbyServer != null) { derbyServer.shutdown(); @@ -177,39 +184,43 @@ public class JdbcIOTest implements Serializable { } } + /** + * Create test data that is consistent with that generated by TestRow. + */ + private static void addInitialData(DataSource dataSource, String tableName) + throws SQLException { + try (Connection connection = dataSource.getConnection()) { + connection.setAutoCommit(false); + try (PreparedStatement preparedStatement = + connection.prepareStatement( + String.format("insert into %s values (?,?)", tableName))) { + for (int i = 0; i < EXPECTED_ROW_COUNT; i++) { + preparedStatement.clearParameters(); + preparedStatement.setInt(1, i); + preparedStatement.setString(2, TestRow.getNameForSeed(i)); + preparedStatement.executeUpdate(); + } + } + connection.commit(); + } + } + @Test @Category(NeedsRunner.class) public void testRead() throws Exception { - - PCollection<KV<String, Integer>> output = pipeline.apply( - JdbcIO.<KV<String, Integer>>read() + PCollection<TestRow> rows = pipeline.apply( + JdbcIO.<TestRow>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) - .withQuery("select name,id from " + JdbcTestDataSet.READ_TABLE_NAME) - .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() { - @Override - public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception { - KV<String, Integer> kv = - KV.of(resultSet.getString("name"), resultSet.getInt("id")); - return kv; - } - }) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + .withQuery("select name,id from " + readTableName) + .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId()) + .withCoder(SerializableCoder.of(TestRow.class))); PAssert.thatSingleton( - output.apply("Count All", Count.<KV<String, Integer>>globally())) - .isEqualTo(1000L); - - PAssert.that(output - .apply("Count Scientist", Count.<String, Integer>perKey()) - ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { - @Override - public Void apply(Iterable<KV<String, Long>> input) { - for (KV<String, Long> element : input) { - assertEquals(element.getKey(), 100L, element.getValue().longValue()); - } - return null; - } - }); + rows.apply("Count All", Count.<TestRow>globally())) + .isEqualTo((long) EXPECTED_ROW_COUNT); + + Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0, EXPECTED_ROW_COUNT); + PAssert.that(rows).containsInAnyOrder(expectedValues); pipeline.run(); } @@ -217,32 +228,27 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReadWithSingleStringParameter() throws Exception { - - PCollection<KV<String, Integer>> output = pipeline.apply( - JdbcIO.<KV<String, Integer>>read() + PCollection<TestRow> rows = pipeline.apply( + JdbcIO.<TestRow>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)) .withQuery(String.format("select name,id from %s where name = ?", - JdbcTestDataSet.READ_TABLE_NAME)) + readTableName)) .withStatementPreparator(new JdbcIO.StatementPreparator() { @Override public void setParameters(PreparedStatement preparedStatement) - throws Exception { - preparedStatement.setString(1, "Darwin"); - } - }) - .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() { - @Override - public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception { - KV<String, Integer> kv = - KV.of(resultSet.getString("name"), resultSet.getInt("id")); - return kv; + throws Exception { + preparedStatement.setString(1, TestRow.getNameForSeed(1)); } }) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId()) + .withCoder(SerializableCoder.of(TestRow.class))); PAssert.thatSingleton( - output.apply("Count One Scientist", Count.<KV<String, Integer>>globally())) - .isEqualTo(100L); + rows.apply("Count All", Count.<TestRow>globally())) + .isEqualTo(1L); + + Iterable<TestRow> expectedValues = Collections.singletonList(TestRow.fromSeed(1)); + PAssert.that(rows).containsInAnyOrder(expectedValues); pipeline.run(); } @@ -250,11 +256,13 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { + final long rowsToAdd = 1000L; - String tableName = JdbcTestDataSet.createWriteDataTable(dataSource); + String tableName = JdbcTestHelper.getTableName("UT_WRITE"); + JdbcTestHelper.createDataTable(dataSource, tableName); try { ArrayList<KV<Integer, String>> data = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < rowsToAdd; i++) { KV<Integer, String> kv = KV.of(i, "Test"); data.add(kv); } @@ -282,19 +290,18 @@ public class JdbcIOTest implements Serializable { resultSet.next(); int count = resultSet.getInt(1); - Assert.assertEquals(2000, count); + Assert.assertEquals(EXPECTED_ROW_COUNT, count); } } } } finally { - JdbcTestDataSet.cleanUpDataTable(dataSource, tableName); + JdbcTestHelper.cleanUpDataTable(dataSource, tableName); } } @Test @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws Exception { - pipeline .apply(Create.empty(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))) .apply(JdbcIO.<KV<Integer, String>>write() http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java deleted file mode 100644 index 0b88be2..0000000 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.jdbc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import javax.sql.DataSource; -import org.apache.beam.sdk.io.common.IOTestPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.postgresql.ds.PGSimpleDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manipulates test data used by the {@link org.apache.beam.sdk.io.jdbc.JdbcIO} tests. - * - * <p>This is independent from the tests so that for read tests it can be run separately after data - * store creation rather than every time (which can be more fragile.) - */ -public class JdbcTestDataSet { - private static final Logger LOG = LoggerFactory.getLogger(JdbcTestDataSet.class); - public static final String[] SCIENTISTS = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", - "Faraday", "McClintock", "Herschel", "Hopper", "Lovelace"}; - /** - * Use this to create the read tables before IT read tests. - * - * <p>To invoke this class, you can use this command line: - * (run from the jdbc root directory) - * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.jdbc.JdbcTestDataSet \ - * -Dexec.args="--postgresServerName=127.0.0.1 --postgresUsername=postgres \ - * --postgresDatabaseName=myfancydb \ - * --postgresPassword=yourpassword --postgresSsl=false" \ - * -Dexec.classpathScope=test - * @param args Please pass options from IOTestPipelineOptions used for connection to postgres as - * shown above. - */ - public static void main(String[] args) throws SQLException { - PipelineOptionsFactory.register(IOTestPipelineOptions.class); - IOTestPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class); - - createReadDataTable(getDataSource(options)); - } - - public static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) - throws SQLException { - PGSimpleDataSource dataSource = new PGSimpleDataSource(); - - // Tests must receive parameters for connections from PipelineOptions - // Parameters should be generic to all tests that use a particular datasource, not - // the particular test. - dataSource.setDatabaseName(options.getPostgresDatabaseName()); - dataSource.setServerName(options.getPostgresServerName()); - dataSource.setPortNumber(options.getPostgresPort()); - dataSource.setUser(options.getPostgresUsername()); - dataSource.setPassword(options.getPostgresPassword()); - dataSource.setSsl(options.getPostgresSsl()); - - return dataSource; - } - - public static final String READ_TABLE_NAME = "BEAM_TEST_READ"; - - public static void createReadDataTable(DataSource dataSource) throws SQLException { - createDataTable(dataSource, READ_TABLE_NAME); - } - - public static String createWriteDataTable(DataSource dataSource) throws SQLException { - String tableName = "BEAMTEST" + org.joda.time.Instant.now().getMillis(); - createDataTable(dataSource, tableName); - return tableName; - } - - private static void createDataTable(DataSource dataSource, String tableName) throws SQLException { - try (Connection connection = dataSource.getConnection()) { - // something like this will need to happen in tests on a newly created postgres server, - // but likely it will happen in perfkit, not here - // alternatively, we may have a pipelineoption indicating whether we want to - // re-use the database or create a new one - try (Statement statement = connection.createStatement()) { - statement.execute( - String.format("create table %s (id INT, name VARCHAR(500))", tableName)); - } - - connection.setAutoCommit(false); - try (PreparedStatement preparedStatement = - connection.prepareStatement( - String.format("insert into %s values (?,?)", tableName))) { - for (int i = 0; i < 1000; i++) { - int index = i % SCIENTISTS.length; - preparedStatement.clearParameters(); - preparedStatement.setInt(1, i); - preparedStatement.setString(2, SCIENTISTS[index]); - preparedStatement.executeUpdate(); - } - } - connection.commit(); - } - - LOG.info("Created table {}", tableName); - } - - public static void cleanUpDataTable(DataSource dataSource, String tableName) - throws SQLException { - if (tableName != null) { - try (Connection connection = dataSource.getConnection(); - Statement statement = connection.createStatement()) { - statement.executeUpdate(String.format("drop table %s", tableName)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/a6201ed1/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java new file mode 100644 index 0000000..fedae51 --- /dev/null +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jdbc; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import javax.sql.DataSource; +import org.apache.beam.sdk.io.common.TestRow; + +/** + * Contains Test helper methods used by both Integration and Unit Tests in + * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}. + */ +class JdbcTestHelper { + static String getTableName(String testIdentifier) throws ParseException { + SimpleDateFormat formatter = new SimpleDateFormat(); + formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S"); + return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date())); + } + + static void createDataTable( + DataSource dataSource, String tableName) + throws SQLException { + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + statement.execute( + String.format("create table %s (id INT, name VARCHAR(500))", tableName)); + } + } + } + + static void cleanUpDataTable(DataSource dataSource, String tableName) + throws SQLException { + if (tableName != null) { + try (Connection connection = dataSource.getConnection(); + Statement statement = connection.createStatement()) { + statement.executeUpdate(String.format("drop table %s", tableName)); + } + } + } + + static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> { + @Override + public TestRow mapRow(ResultSet resultSet) throws Exception { + return TestRow.create( + resultSet.getInt("id"), resultSet.getString("name")); + } + } + + static class PrepareStatementFromTestRow + implements JdbcIO.PreparedStatementSetter<TestRow> { + @Override + public void setParameters(TestRow element, PreparedStatement statement) + throws SQLException { + statement.setLong(1, element.id()); + statement.setString(2, element.name()); + } + } + +}