Repository: flink Updated Branches: refs/heads/master 6a0ada81e -> 9fb074c9c
[FLINK-6307] [jdbc] Refactor JDBC tests JDBCFullTest: - split testJdbcInOut into 2 methods to avoid manul test-lifecycle calls JDBCTestBase: - remove all qualified static accesses - remove static Connection field - remove (now) unused prepareTestDB method - create RowTypeInfo directly instead of first allocating a separate TypeInfo[] - rename testData to TEST_DATA in-line with naming conventions - rework test data to not rely on Object arrays JDBCInputFormatTest: - call InputFormat#closeInputFormat() in tearDown() - simplify method exception declarations - remove unreachable branch when format returns null (this should fail the test) - replace loops over splits with for-each loops - rework comparisons; no longer ignore nulls, no longer check class, compare directly against expected value JDBCOutputFormatTest: - directly create Row instead of first creating a tuple - simplify method exception declarations General: - do not catch exceptions if the catch block only calls Assert.fail() This closes #3723. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fb074c9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fb074c9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fb074c9 Branch: refs/heads/master Commit: 9fb074c9ca97bbfedac1e527fadbe4aeac80af18 Parents: 6a0ada8 Author: zentol <[email protected]> Authored: Sat Apr 15 18:07:15 2017 +0200 Committer: zentol <[email protected]> Committed: Wed Apr 19 12:17:45 2017 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/jdbc/JDBCFullTest.java | 34 ++-- .../api/java/io/jdbc/JDBCInputFormatTest.java | 189 ++++++------------- .../api/java/io/jdbc/JDBCOutputFormatTest.java | 68 +++---- .../flink/api/java/io/jdbc/JDBCTestBase.java | 140 ++++++-------- 4 files changed, 150 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java index 88aa4fa..78cf69c 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -22,7 +22,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Types; import org.apache.flink.api.java.DataSet; @@ -36,20 +35,16 @@ import org.junit.Test; public class JDBCFullTest extends JDBCTestBase { @Test - public void testJdbcInOut() throws Exception { - //run without parallelism + public void testWithoutParallelism() throws Exception { runTest(false); + } - //cleanup - JDBCTestBase.tearDownClass(); - JDBCTestBase.prepareTestDb(); - - //run expliting parallelism + @Test + public void testWithParallelism() throws Exception { runTest(true); - } - private void runTest(boolean exploitParallelism) { + private void runTest(boolean exploitParallelism) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(JDBCTestBase.DRIVER_CLASS) @@ -57,10 +52,10 @@ public class JDBCFullTest extends JDBCTestBase { .setQuery(JDBCTestBase.SELECT_ALL_BOOKS) .setRowTypeInfo(rowTypeInfo); - if(exploitParallelism) { + if (exploitParallelism) { final int fetchSize = 1; - final Long min = new Long(JDBCTestBase.testData[0][0].toString()); - final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString()); + final long min = JDBCTestBase.TEST_DATA[0].id; + final long max = JDBCTestBase.TEST_DATA[JDBCTestBase.TEST_DATA.length - fetchSize].id; //use a "splittable" query to exploit parallelism inputBuilder = inputBuilder .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) @@ -69,7 +64,7 @@ public class JDBCFullTest extends JDBCTestBase { DataSet<Row> source = environment.createInput(inputBuilder.finish()); //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but - //some database, doens't handle correctly null values when no column type specified + //some databases don't null values correctly when no column type was specified //in PreparedStatement.setObject (see its javadoc for more details) source.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(JDBCTestBase.DRIVER_CLASS) @@ -77,11 +72,8 @@ public class JDBCFullTest extends JDBCTestBase { .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER}) .finish()); - try { - environment.execute(); - } catch (Exception e) { - Assert.fail("JDBC full test failed. " + e.getMessage()); - } + + environment.execute(); try ( Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); @@ -92,9 +84,7 @@ public class JDBCFullTest extends JDBCTestBase { while (resultSet.next()) { count++; } - Assert.assertEquals(JDBCTestBase.testData.length, count); - } catch (SQLException e) { - Assert.fail("JDBC full test failed. " + e.getMessage()); + Assert.assertEquals(JDBCTestBase.TEST_DATA.length, count); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index bee3d25..3f6a87a 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -39,6 +39,7 @@ public class JDBCInputFormatTest extends JDBCTestBase { public void tearDown() throws IOException { if (jdbcInputFormat != null) { jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); } jdbcInputFormat = null; } @@ -96,7 +97,7 @@ public class JDBCInputFormatTest extends JDBCTestBase { } @Test - public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException { + public void testJDBCInputFormatWithoutParallelism() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) @@ -112,43 +113,21 @@ public class JDBCInputFormatTest extends JDBCTestBase { int recordCount = 0; while (!jdbcInputFormat.reachedEnd()) { Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - - if (next.getField(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass()); - } - if (next.getField(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass()); - } - if (next.getField(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass()); - } - if (next.getField(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass()); - } - if (next.getField(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass()); - } - for (int x = 0; x < 5; x++) { - if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.getField(x)); - } - } + assertEquals(TEST_DATA[recordCount], next); + recordCount++; } jdbcInputFormat.close(); jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(testData.length, recordCount); + Assert.assertEquals(TEST_DATA.length, recordCount); } @Test - public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException { final int fetchSize = 1; - final Long min = new Long(JDBCTestBase.testData[0][0] + ""); - final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + ""); + final long min = TEST_DATA[0].id; + final long max = TEST_DATA[TEST_DATA.length - fetchSize].id; ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) @@ -162,49 +141,28 @@ public class JDBCInputFormatTest extends JDBCTestBase { jdbcInputFormat.openInputFormat(); InputSplit[] splits = jdbcInputFormat.createInputSplits(1); //this query exploit parallelism (1 split for every id) - Assert.assertEquals(testData.length, splits.length); + Assert.assertEquals(TEST_DATA.length, splits.length); int recordCount = 0; Row row = new Row(5); - for (int i = 0; i < splits.length; i++) { - jdbcInputFormat.open(splits[i]); + for (InputSplit split : splits) { + jdbcInputFormat.open(split); while (!jdbcInputFormat.reachedEnd()) { Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - if (next.getField(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass()); - } - if (next.getField(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass()); - } - if (next.getField(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass()); - } - if (next.getField(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass()); - } - if (next.getField(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass()); - } - for (int x = 0; x < 5; x++) { - if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.getField(x)); - } - } + assertEquals(TEST_DATA[recordCount], next); + recordCount++; } jdbcInputFormat.close(); } jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(testData.length, recordCount); + Assert.assertEquals(TEST_DATA.length, recordCount); } @Test - public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { - final Long min = new Long(JDBCTestBase.testData[0][0] + ""); - final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + ""); + public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException { + final long min = TEST_DATA[0].id; + final long max = TEST_DATA[TEST_DATA.length - 1].id; final long fetchSize = max + 1;//generate a single split ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() @@ -222,47 +180,26 @@ public class JDBCInputFormatTest extends JDBCTestBase { Assert.assertEquals(1, splits.length); int recordCount = 0; Row row = new Row(5); - for (int i = 0; i < splits.length; i++) { - jdbcInputFormat.open(splits[i]); + for (InputSplit split : splits) { + jdbcInputFormat.open(split); while (!jdbcInputFormat.reachedEnd()) { Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - if (next.getField(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass()); - } - if (next.getField(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass()); - } - if (next.getField(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass()); - } - if (next.getField(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass()); - } - if (next.getField(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass()); - } - for (int x = 0; x < 5; x++) { - if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.getField(x)); - } - } + assertEquals(TEST_DATA[recordCount], next); + recordCount++; } jdbcInputFormat.close(); } jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(testData.length, recordCount); + Assert.assertEquals(TEST_DATA.length, recordCount); } @Test - public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException { + public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException { Serializable[][] queryParameters = new String[2][1]; - queryParameters[0] = new String[]{"Kumar"}; - queryParameters[1] = new String[]{"Tan Ah Teck"}; + queryParameters[0] = new String[]{TEST_DATA[3].author}; + queryParameters[1] = new String[]{TEST_DATA[0].author}; ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters); jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) @@ -272,45 +209,38 @@ public class JDBCInputFormatTest extends JDBCTestBase { .setParametersProvider(paramProvider) .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); + jdbcInputFormat.openInputFormat(); InputSplit[] splits = jdbcInputFormat.createInputSplits(1); //this query exploit parallelism (1 split for every queryParameters row) Assert.assertEquals(queryParameters.length, splits.length); - int recordCount = 0; + + verifySplit(splits[0], TEST_DATA[3].id); + verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id); + + jdbcInputFormat.closeInputFormat(); + } + + private void verifySplit(InputSplit split, int expectedIDSum) throws IOException { + int sum = 0; + Row row = new Row(5); - for (int i = 0; i < splits.length; i++) { - jdbcInputFormat.open(splits[i]); - while (!jdbcInputFormat.reachedEnd()) { - Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - if (next.getField(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, next.getField(0).getClass()); - } - if (next.getField(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, next.getField(1).getClass()); - } - if (next.getField(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, next.getField(2).getClass()); - } - if (next.getField(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, next.getField(3).getClass()); - } - if (next.getField(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, next.getField(4).getClass()); - } + jdbcInputFormat.open(split); + while (!jdbcInputFormat.reachedEnd()) { + row = jdbcInputFormat.nextRecord(row); - recordCount++; - } - jdbcInputFormat.close(); + int id = ((int) row.getField(0)); + int testDataIndex = id - 1001; + + assertEquals(TEST_DATA[testDataIndex], row); + sum += id; } - Assert.assertEquals(3, recordCount); - jdbcInputFormat.closeInputFormat(); + + Assert.assertEquals(expectedIDSum, sum); } @Test - public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException { + public void testEmptyResults() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) @@ -318,17 +248,22 @@ public class JDBCInputFormatTest extends JDBCTestBase { .setRowTypeInfo(rowTypeInfo) .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); - jdbcInputFormat.openInputFormat(); - jdbcInputFormat.open(null); - Row row = new Row(5); - int recordsCnt = 0; - while (!jdbcInputFormat.reachedEnd()) { - Assert.assertNull(jdbcInputFormat.nextRecord(row)); - recordsCnt++; + try { + jdbcInputFormat.openInputFormat(); + jdbcInputFormat.open(null); + Assert.assertTrue(jdbcInputFormat.reachedEnd()); + } finally { + jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); } - jdbcInputFormat.close(); - jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(0, recordsCnt); + } + + private static void assertEquals(TestEntry expected, Row actual) { + Assert.assertEquals(expected.id, actual.getField(0)); + Assert.assertEquals(expected.title, actual.getField(1)); + Assert.assertEquals(expected.author, actual.getField(2)); + Assert.assertEquals(expected.price, actual.getField(3)); + Assert.assertEquals(expected.qty, actual.getField(4)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java index 8de0c34..a67c1ce 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -25,7 +25,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.types.Row; import org.junit.After; import org.junit.Assert; @@ -34,7 +33,6 @@ import org.junit.Test; public class JDBCOutputFormatTest extends JDBCTestBase { private JDBCOutputFormat jdbcOutputFormat; - private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>(); @After public void tearDown() throws IOException { @@ -92,23 +90,19 @@ public class JDBCOutputFormatTest extends JDBCTestBase { .finish(); jdbcOutputFormat.open(0, 1); - tuple5.setField(4, 0); - tuple5.setField("hello", 1); - tuple5.setField("world", 2); - tuple5.setField(0.99, 3); - tuple5.setField("imthewrongtype", 4); + Row row = new Row(5); + row.setField(0, 4); + row.setField(1, "hello"); + row.setField(2, "world"); + row.setField(3, 0.99); + row.setField(4, "imthewrongtype"); - Row row = new Row(tuple5.getArity()); - for (int i = 0; i < tuple5.getArity(); i++) { - row.setField(i, tuple5.getField(i)); - } jdbcOutputFormat.writeRecord(row); jdbcOutputFormat.close(); } @Test - public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException { - + public void testJDBCOutputFormat() throws IOException, SQLException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) @@ -116,54 +110,34 @@ public class JDBCOutputFormatTest extends JDBCTestBase { .finish(); jdbcOutputFormat.open(0, 1); - for (int i = 0; i < testData.length; i++) { - Row row = new Row(testData[i].length); - for (int j = 0; j < testData[i].length; j++) { - row.setField(j, testData[i][j]); - } + for (JDBCTestBase.TestEntry entry : TEST_DATA) { + Row row = new Row(5); + row.setField(0, entry.id); + row.setField(1, entry.title); + row.setField(2, entry.author); + row.setField(3, entry.price); + row.setField(4, entry.qty); jdbcOutputFormat.writeRecord(row); } jdbcOutputFormat.close(); try ( - Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); + Connection dbConn = DriverManager.getConnection(DB_URL); PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); ResultSet resultSet = statement.executeQuery() ) { int recordCount = 0; while (resultSet.next()) { - Row row = new Row(tuple5.getArity()); - for (int i = 0; i < tuple5.getArity(); i++) { - row.setField(i, resultSet.getObject(i + 1)); - } - if (row.getField(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, row.getField(0).getClass()); - } - if (row.getField(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, row.getField(1).getClass()); - } - if (row.getField(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, row.getField(2).getClass()); - } - if (row.getField(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, row.getField(3).getClass()); - } - if (row.getField(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, row.getField(4).getClass()); - } - - for (int x = 0; x < tuple5.getArity(); x++) { - if (JDBCTestBase.testData[recordCount][x] != null) { - Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x)); - } - } + Assert.assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id")); + Assert.assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title")); + Assert.assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author")); + Assert.assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price")); + Assert.assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty")); recordCount++; } - Assert.assertEquals(JDBCTestBase.testData.length, recordCount); - } catch (SQLException e) { - Assert.fail("JDBC OutputFormat test failed. " + e.getMessage()); + Assert.assertEquals(TEST_DATA.length, recordCount); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java index ffcb26f..13da4c7 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -24,10 +24,8 @@ import java.sql.SQLException; import java.sql.Statement; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; /** @@ -43,32 +41,44 @@ public class JDBCTestBase { public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; - public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; - public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?"; - protected static Connection conn; + public static final TestEntry[] TEST_DATA = { + new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11), + new TestEntry(1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22), + new TestEntry(1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33), + new TestEntry(1004, ("A Cup of Java"), ("Kumar"), 44.44, 44), + new TestEntry(1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55), + new TestEntry(1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66), + new TestEntry(1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77), + new TestEntry(1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88), + new TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99), + new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010) + }; - public static final Object[][] testData = { - {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}, - {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66}, - {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77}, - {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88}, - {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99}, - {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}}; + protected static class TestEntry { + protected final Integer id; + protected final String title; + protected final String author; + protected final Double price; + protected final Integer qty; + + private TestEntry(Integer id, String title, String author, Double price, Integer qty) { + this.id = id; + this.title = title; + this.author = author; + this.price = price; + this.qty = qty; + } + } - public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { + public static final RowTypeInfo rowTypeInfo = new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }; - - public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); + BasicTypeInfo.INT_TYPE_INFO); public static String getCreateQuery(String tableName) { StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); @@ -84,14 +94,14 @@ public class JDBCTestBase { public static String getInsertQuery() { StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - for (int i = 0; i < JDBCTestBase.testData.length; i++) { + for (int i = 0; i < TEST_DATA.length; i++) { sqlQueryBuilder.append("(") - .append(JDBCTestBase.testData[i][0]).append(",'") - .append(JDBCTestBase.testData[i][1]).append("','") - .append(JDBCTestBase.testData[i][2]).append("',") - .append(JDBCTestBase.testData[i][3]).append(",") - .append(JDBCTestBase.testData[i][4]).append(")"); - if (i < JDBCTestBase.testData.length - 1) { + .append(TEST_DATA[i].id).append(",'") + .append(TEST_DATA[i].title).append("','") + .append(TEST_DATA[i].author).append("',") + .append(TEST_DATA[i].price).append(",") + .append(TEST_DATA[i].qty).append(")"); + if (i < TEST_DATA.length - 1) { sqlQueryBuilder.append(","); } } @@ -105,79 +115,39 @@ public class JDBCTestBase { } }; - public static void prepareTestDb() throws Exception { - System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); - Class.forName(DRIVER_CLASS); - Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); - - //create input table - Statement stat = conn.createStatement(); - stat.executeUpdate(getCreateQuery(INPUT_TABLE)); - stat.close(); - - //create output table - stat = conn.createStatement(); - stat.executeUpdate(getCreateQuery(OUTPUT_TABLE)); - stat.close(); - - //prepare input data - stat = conn.createStatement(); - stat.execute(JDBCTestBase.getInsertQuery()); - stat.close(); - - conn.close(); - } - @BeforeClass - public static void setUpClass() throws SQLException { - try { - System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); - prepareDerbyDatabase(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } - } + public static void prepareDerbyDatabase() throws Exception { + System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { Class.forName(DRIVER_CLASS); - conn = DriverManager.getConnection(DB_URL + ";create=true"); - createTable(INPUT_TABLE); - createTable(OUTPUT_TABLE); - insertDataIntoInputTable(); - conn.close(); + try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) { + createTable(conn, JDBCTestBase.INPUT_TABLE); + createTable(conn, OUTPUT_TABLE); + insertDataIntoInputTable(conn); + } } - private static void createTable(String tableName) throws SQLException { + private static void createTable(Connection conn, String tableName) throws SQLException { Statement stat = conn.createStatement(); stat.executeUpdate(getCreateQuery(tableName)); stat.close(); } - private static void insertDataIntoInputTable() throws SQLException { + private static void insertDataIntoInputTable(Connection conn) throws SQLException { Statement stat = conn.createStatement(); - stat.execute(JDBCTestBase.getInsertQuery()); + stat.execute(getInsertQuery()); stat.close(); } @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } + public static void cleanUpDerbyDatabases() throws Exception { + Class.forName(DRIVER_CLASS); + try ( + Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); + Statement stat = conn.createStatement()) { - private static void cleanUpDerbyDatabases() { - try { - Class.forName(DRIVER_CLASS); - conn = DriverManager.getConnection(DB_URL + ";create=true"); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE "+INPUT_TABLE); - stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); + stat.executeUpdate("DROP TABLE " + INPUT_TABLE); + stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE); } } - }
