Repository: flink
Updated Branches:
  refs/heads/master 6a0ada81e -> 9fb074c9c


[FLINK-6307] [jdbc] Refactor JDBC tests

JDBCFullTest:
- split testJdbcInOut into 2 methods to avoid manul test-lifecycle calls
JDBCTestBase:
- remove all qualified static accesses
- remove static Connection field
- remove (now) unused prepareTestDB method
- create RowTypeInfo directly instead of first allocating a separate
TypeInfo[]
- rename testData to TEST_DATA in-line with naming conventions
- rework test data to not rely on Object arrays

JDBCInputFormatTest:
- call InputFormat#closeInputFormat() in tearDown()
- simplify method exception declarations
- remove unreachable branch when format returns null (this should fail
the test)
- replace loops over splits with for-each loops
- rework comparisons; no longer ignore nulls, no longer check class,
compare directly against expected value

JDBCOutputFormatTest:
- directly create Row instead of first creating a tuple
- simplify method exception declarations

General:
- do not catch exceptions if the catch block only calls Assert.fail()

This closes #3723.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fb074c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fb074c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fb074c9

Branch: refs/heads/master
Commit: 9fb074c9ca97bbfedac1e527fadbe4aeac80af18
Parents: 6a0ada8
Author: zentol <[email protected]>
Authored: Sat Apr 15 18:07:15 2017 +0200
Committer: zentol <[email protected]>
Committed: Wed Apr 19 12:17:45 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCFullTest.java    |  34 ++--
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 189 ++++++-------------
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  68 +++----
 .../flink/api/java/io/jdbc/JDBCTestBase.java    | 140 ++++++--------
 4 files changed, 150 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 88aa4fa..78cf69c 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Types;
 
 import org.apache.flink.api.java.DataSet;
@@ -36,20 +35,16 @@ import org.junit.Test;
 public class JDBCFullTest extends JDBCTestBase {
 
        @Test
-       public void testJdbcInOut() throws Exception {
-               //run without parallelism
+       public void testWithoutParallelism() throws Exception {
                runTest(false);
+       }
 
-               //cleanup
-               JDBCTestBase.tearDownClass();
-               JDBCTestBase.prepareTestDb();
-               
-               //run expliting parallelism
+       @Test
+       public void testWithParallelism() throws Exception {
                runTest(true);
-               
        }
 
-       private void runTest(boolean exploitParallelism) {
+       private void runTest(boolean exploitParallelism) throws Exception {
                ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
                JDBCInputFormatBuilder inputBuilder = 
JDBCInputFormat.buildJDBCInputFormat()
                                .setDrivername(JDBCTestBase.DRIVER_CLASS)
@@ -57,10 +52,10 @@ public class JDBCFullTest extends JDBCTestBase {
                                .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
                                .setRowTypeInfo(rowTypeInfo);
 
-               if(exploitParallelism) {
+               if (exploitParallelism) {
                        final int fetchSize = 1;
-                       final Long min = new 
Long(JDBCTestBase.testData[0][0].toString());
-                       final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 
fetchSize][0].toString());
+                       final long min = JDBCTestBase.TEST_DATA[0].id;
+                       final long max = 
JDBCTestBase.TEST_DATA[JDBCTestBase.TEST_DATA.length - fetchSize].id;
                        //use a "splittable" query to exploit parallelism
                        inputBuilder = inputBuilder
                                        
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
@@ -69,7 +64,7 @@ public class JDBCFullTest extends JDBCTestBase {
                DataSet<Row> source = 
environment.createInput(inputBuilder.finish());
 
                //NOTE: in this case (with Derby driver) setSqlTypes could be 
skipped, but
-               //some database, doens't handle correctly null values when no 
column type specified
+               //some databases don't null values correctly when no column 
type was specified
                //in PreparedStatement.setObject (see its javadoc for more 
details)
                source.output(JDBCOutputFormat.buildJDBCOutputFormat()
                                .setDrivername(JDBCTestBase.DRIVER_CLASS)
@@ -77,11 +72,8 @@ public class JDBCFullTest extends JDBCTestBase {
                                .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
                                .setSqlTypes(new int[]{Types.INTEGER, 
Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
                                .finish());
-               try {
-                       environment.execute();
-               } catch (Exception e) {
-                       Assert.fail("JDBC full test failed. " + e.getMessage());
-               }
+
+               environment.execute();
 
                try (
                        Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
@@ -92,9 +84,7 @@ public class JDBCFullTest extends JDBCTestBase {
                        while (resultSet.next()) {
                                count++;
                        }
-                       Assert.assertEquals(JDBCTestBase.testData.length, 
count);
-               } catch (SQLException e) {
-                       Assert.fail("JDBC full test failed. " + e.getMessage());
+                       Assert.assertEquals(JDBCTestBase.TEST_DATA.length, 
count);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index bee3d25..3f6a87a 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -39,6 +39,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
        public void tearDown() throws IOException {
                if (jdbcInputFormat != null) {
                        jdbcInputFormat.close();
+                       jdbcInputFormat.closeInputFormat();
                }
                jdbcInputFormat = null;
        }
@@ -96,7 +97,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
        }
 
        @Test
-       public void testJDBCInputFormatWithoutParallelism() throws IOException, 
InstantiationException, IllegalAccessException {
+       public void testJDBCInputFormatWithoutParallelism() throws IOException {
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                                .setDrivername(DRIVER_CLASS)
                                .setDBUrl(DB_URL)
@@ -112,43 +113,21 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                int recordCount = 0;
                while (!jdbcInputFormat.reachedEnd()) {
                        Row next = jdbcInputFormat.nextRecord(row);
-                       if (next == null) {
-                               break;
-                       }
-                       
-                       if (next.getField(0) != null) {
-                               Assert.assertEquals("Field 0 should be int", 
Integer.class, next.getField(0).getClass());
-                       }
-                       if (next.getField(1) != null) {
-                               Assert.assertEquals("Field 1 should be String", 
String.class, next.getField(1).getClass());
-                       }
-                       if (next.getField(2) != null) {
-                               Assert.assertEquals("Field 2 should be String", 
String.class, next.getField(2).getClass());
-                       }
-                       if (next.getField(3) != null) {
-                               Assert.assertEquals("Field 3 should be float", 
Double.class, next.getField(3).getClass());
-                       }
-                       if (next.getField(4) != null) {
-                               Assert.assertEquals("Field 4 should be int", 
Integer.class, next.getField(4).getClass());
-                       }
 
-                       for (int x = 0; x < 5; x++) {
-                               if(testData[recordCount][x]!=null) {
-                                       
Assert.assertEquals(testData[recordCount][x], next.getField(x));
-                               }
-                       }
+                       assertEquals(TEST_DATA[recordCount], next);
+
                        recordCount++;
                }
                jdbcInputFormat.close();
                jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(testData.length, recordCount);
+               Assert.assertEquals(TEST_DATA.length, recordCount);
        }
        
        @Test
-       public void 
testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws 
IOException, InstantiationException, IllegalAccessException {
+       public void 
testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws 
IOException {
                final int fetchSize = 1;
-               final Long min = new Long(JDBCTestBase.testData[0][0] + "");
-               final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
+               final long min = TEST_DATA[0].id;
+               final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
                ParameterValuesProvider pramProvider = new 
NumericBetweenParametersProvider(fetchSize, min, max);
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                                .setDrivername(DRIVER_CLASS)
@@ -162,49 +141,28 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                jdbcInputFormat.openInputFormat();
                InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
                //this query exploit parallelism (1 split for every id)
-               Assert.assertEquals(testData.length, splits.length);
+               Assert.assertEquals(TEST_DATA.length, splits.length);
                int recordCount = 0;
                Row row =  new Row(5);
-               for (int i = 0; i < splits.length; i++) {
-                       jdbcInputFormat.open(splits[i]);
+               for (InputSplit split : splits) {
+                       jdbcInputFormat.open(split);
                        while (!jdbcInputFormat.reachedEnd()) {
                                Row next = jdbcInputFormat.nextRecord(row);
-                               if (next == null) {
-                                       break;
-                               }
-                               if (next.getField(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, next.getField(0).getClass());
-                               }
-                               if (next.getField(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, next.getField(1).getClass());
-                               }
-                               if (next.getField(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, next.getField(2).getClass());
-                               }
-                               if (next.getField(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, next.getField(3).getClass());
-                               }
-                               if (next.getField(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, next.getField(4).getClass());
-                               }
 
-                               for (int x = 0; x < 5; x++) {
-                                       if(testData[recordCount][x]!=null) {
-                                               
Assert.assertEquals(testData[recordCount][x], next.getField(x));
-                                       }
-                               }
+                               assertEquals(TEST_DATA[recordCount], next);
+
                                recordCount++;
                        }
                        jdbcInputFormat.close();
                }
                jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(testData.length, recordCount);
+               Assert.assertEquals(TEST_DATA.length, recordCount);
        }
 
        @Test
-       public void 
testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws 
IOException, InstantiationException, IllegalAccessException {
-               final Long min = new Long(JDBCTestBase.testData[0][0] + "");
-               final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 1][0] + "");
+       public void 
testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws 
IOException {
+               final long min = TEST_DATA[0].id;
+               final long max = TEST_DATA[TEST_DATA.length - 1].id;
                final long fetchSize = max + 1;//generate a single split
                ParameterValuesProvider pramProvider = new 
NumericBetweenParametersProvider(fetchSize, min, max);
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
@@ -222,47 +180,26 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                Assert.assertEquals(1, splits.length);
                int recordCount = 0;
                Row row =  new Row(5);
-               for (int i = 0; i < splits.length; i++) {
-                       jdbcInputFormat.open(splits[i]);
+               for (InputSplit split : splits) {
+                       jdbcInputFormat.open(split);
                        while (!jdbcInputFormat.reachedEnd()) {
                                Row next = jdbcInputFormat.nextRecord(row);
-                               if (next == null) {
-                                       break;
-                               }
-                               if (next.getField(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, next.getField(0).getClass());
-                               }
-                               if (next.getField(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, next.getField(1).getClass());
-                               }
-                               if (next.getField(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, next.getField(2).getClass());
-                               }
-                               if (next.getField(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, next.getField(3).getClass());
-                               }
-                               if (next.getField(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, next.getField(4).getClass());
-                               }
 
-                               for (int x = 0; x < 5; x++) {
-                                       if(testData[recordCount][x]!=null) {
-                                               
Assert.assertEquals(testData[recordCount][x], next.getField(x));
-                                       }
-                               }
+                               assertEquals(TEST_DATA[recordCount], next);
+
                                recordCount++;
                        }
                        jdbcInputFormat.close();
                }
                jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(testData.length, recordCount);
+               Assert.assertEquals(TEST_DATA.length, recordCount);
        }
        
        @Test
-       public void testJDBCInputFormatWithParallelismAndGenericSplitting() 
throws IOException, InstantiationException, IllegalAccessException {
+       public void testJDBCInputFormatWithParallelismAndGenericSplitting() 
throws IOException {
                Serializable[][] queryParameters = new String[2][1];
-               queryParameters[0] = new String[]{"Kumar"};
-               queryParameters[1] = new String[]{"Tan Ah Teck"};
+               queryParameters[0] = new String[]{TEST_DATA[3].author};
+               queryParameters[1] = new String[]{TEST_DATA[0].author};
                ParameterValuesProvider paramProvider = new 
GenericParameterValuesProvider(queryParameters);
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                                .setDrivername(DRIVER_CLASS)
@@ -272,45 +209,38 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                .setParametersProvider(paramProvider)
                                
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
                                .finish();
+
                jdbcInputFormat.openInputFormat();
                InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
                //this query exploit parallelism (1 split for every 
queryParameters row)
                Assert.assertEquals(queryParameters.length, splits.length);
-               int recordCount = 0;
+
+               verifySplit(splits[0], TEST_DATA[3].id);
+               verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);
+
+               jdbcInputFormat.closeInputFormat();
+       }
+
+       private void verifySplit(InputSplit split, int expectedIDSum) throws 
IOException {
+               int sum = 0;
+
                Row row =  new Row(5);
-               for (int i = 0; i < splits.length; i++) {
-                       jdbcInputFormat.open(splits[i]);
-                       while (!jdbcInputFormat.reachedEnd()) {
-                               Row next = jdbcInputFormat.nextRecord(row);
-                               if (next == null) {
-                                       break;
-                               }
-                               if (next.getField(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, next.getField(0).getClass());
-                               }
-                               if (next.getField(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, next.getField(1).getClass());
-                               }
-                               if (next.getField(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, next.getField(2).getClass());
-                               }
-                               if (next.getField(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, next.getField(3).getClass());
-                               }
-                               if (next.getField(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, next.getField(4).getClass());
-                               }
+               jdbcInputFormat.open(split);
+               while (!jdbcInputFormat.reachedEnd()) {
+                       row = jdbcInputFormat.nextRecord(row);
 
-                               recordCount++;
-                       }
-                       jdbcInputFormat.close();
+                       int id = ((int) row.getField(0));
+                       int testDataIndex = id - 1001;
+                       
+                       assertEquals(TEST_DATA[testDataIndex], row);
+                       sum += id;
                }
-               Assert.assertEquals(3, recordCount);
-               jdbcInputFormat.closeInputFormat();
+               
+               Assert.assertEquals(expectedIDSum, sum);
        }
        
        @Test
-       public void testEmptyResults() throws IOException, 
InstantiationException, IllegalAccessException {
+       public void testEmptyResults() throws IOException {
                jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                                .setDrivername(DRIVER_CLASS)
                                .setDBUrl(DB_URL)
@@ -318,17 +248,22 @@ public class JDBCInputFormatTest extends JDBCTestBase {
                                .setRowTypeInfo(rowTypeInfo)
                                
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
                                .finish();
-               jdbcInputFormat.openInputFormat();
-               jdbcInputFormat.open(null);
-               Row row = new Row(5);
-               int recordsCnt = 0;
-               while (!jdbcInputFormat.reachedEnd()) {
-                       Assert.assertNull(jdbcInputFormat.nextRecord(row));
-                       recordsCnt++;
+               try {
+                       jdbcInputFormat.openInputFormat();
+                       jdbcInputFormat.open(null);
+                       Assert.assertTrue(jdbcInputFormat.reachedEnd());
+               } finally {
+                       jdbcInputFormat.close();
+                       jdbcInputFormat.closeInputFormat();
                }
-               jdbcInputFormat.close();
-               jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(0, recordsCnt);
+       }
+
+       private static void assertEquals(TestEntry expected, Row actual) {
+               Assert.assertEquals(expected.id, actual.getField(0));
+               Assert.assertEquals(expected.title, actual.getField(1));
+               Assert.assertEquals(expected.author, actual.getField(2));
+               Assert.assertEquals(expected.price, actual.getField(3));
+               Assert.assertEquals(expected.qty, actual.getField(4));
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 8de0c34..a67c1ce 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -25,7 +25,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.types.Row;
 import org.junit.After;
 import org.junit.Assert;
@@ -34,7 +33,6 @@ import org.junit.Test;
 public class JDBCOutputFormatTest extends JDBCTestBase {
 
        private JDBCOutputFormat jdbcOutputFormat;
-       private Tuple5<Integer, String, String, Double, String> tuple5 = new 
Tuple5<>();
 
        @After
        public void tearDown() throws IOException {
@@ -92,23 +90,19 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                                .finish();
                jdbcOutputFormat.open(0, 1);
 
-               tuple5.setField(4, 0);
-               tuple5.setField("hello", 1);
-               tuple5.setField("world", 2);
-               tuple5.setField(0.99, 3);
-               tuple5.setField("imthewrongtype", 4);
+               Row row = new Row(5);
+               row.setField(0, 4);
+               row.setField(1, "hello");
+               row.setField(2, "world");
+               row.setField(3, 0.99);
+               row.setField(4, "imthewrongtype");
 
-               Row row = new Row(tuple5.getArity());
-               for (int i = 0; i < tuple5.getArity(); i++) {
-                       row.setField(i, tuple5.getField(i));
-               }
                jdbcOutputFormat.writeRecord(row);
                jdbcOutputFormat.close();
        }
 
        @Test
-       public void testJDBCOutputFormat() throws IOException, 
InstantiationException, IllegalAccessException {
-
+       public void testJDBCOutputFormat() throws IOException, SQLException {
                jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
                                .setDrivername(DRIVER_CLASS)
                                .setDBUrl(DB_URL)
@@ -116,54 +110,34 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                                .finish();
                jdbcOutputFormat.open(0, 1);
 
-               for (int i = 0; i < testData.length; i++) {
-                       Row row = new Row(testData[i].length);
-                       for (int j = 0; j < testData[i].length; j++) {
-                               row.setField(j, testData[i][j]);
-                       }
+               for (JDBCTestBase.TestEntry entry : TEST_DATA) {
+                       Row row = new Row(5);
+                       row.setField(0, entry.id);
+                       row.setField(1, entry.title);
+                       row.setField(2, entry.author);
+                       row.setField(3, entry.price);
+                       row.setField(4, entry.qty);
                        jdbcOutputFormat.writeRecord(row);
                }
 
                jdbcOutputFormat.close();
 
                try (
-                       Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
+                       Connection dbConn = DriverManager.getConnection(DB_URL);
                        PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
                        ResultSet resultSet = statement.executeQuery()
                ) {
                        int recordCount = 0;
                        while (resultSet.next()) {
-                               Row row = new Row(tuple5.getArity());
-                               for (int i = 0; i < tuple5.getArity(); i++) {
-                                       row.setField(i, resultSet.getObject(i + 
1));
-                               }
-                               if (row.getField(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, row.getField(0).getClass());
-                               }
-                               if (row.getField(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, row.getField(1).getClass());
-                               }
-                               if (row.getField(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, row.getField(2).getClass());
-                               }
-                               if (row.getField(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, row.getField(3).getClass());
-                               }
-                               if (row.getField(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, row.getField(4).getClass());
-                               }
-
-                               for (int x = 0; x < tuple5.getArity(); x++) {
-                                       if 
(JDBCTestBase.testData[recordCount][x] != null) {
-                                               
Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.getField(x));
-                                       }
-                               }
+                               Assert.assertEquals(TEST_DATA[recordCount].id, 
resultSet.getObject("id"));
+                               
Assert.assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
+                               
Assert.assertEquals(TEST_DATA[recordCount].author, 
resultSet.getObject("author"));
+                               
Assert.assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
+                               Assert.assertEquals(TEST_DATA[recordCount].qty, 
resultSet.getObject("qty"));
 
                                recordCount++;
                        }
-                       Assert.assertEquals(JDBCTestBase.testData.length, 
recordCount);
-               } catch (SQLException e) {
-                       Assert.fail("JDBC OutputFormat test failed. " + 
e.getMessage());
+                       Assert.assertEquals(TEST_DATA.length, recordCount);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fb074c9/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index ffcb26f..13da4c7 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -24,10 +24,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 
 /**
@@ -43,32 +41,44 @@ public class JDBCTestBase {
        public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
        public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
        public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
-       public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
-       public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+       public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+       public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
SELECT_ALL_BOOKS + " WHERE author = ?";
        
-       protected static Connection conn;
+       public static final TestEntry[] TEST_DATA = {
+                       new TestEntry(1001, ("Java public for dummies"), ("Tan 
Ah Teck"), 11.11, 11),
+                       new TestEntry(1002, ("More Java for dummies"), ("Tan Ah 
Teck"), 22.22, 22),
+                       new TestEntry(1003, ("More Java for more dummies"), 
("Mohammad Ali"), 33.33, 33),
+                       new TestEntry(1004, ("A Cup of Java"), ("Kumar"), 
44.44, 44),
+                       new TestEntry(1005, ("A Teaspoon of Java"), ("Kevin 
Jones"), 55.55, 55),
+                       new TestEntry(1006, ("A Teaspoon of Java 1.4"), ("Kevin 
Jones"), 66.66, 66),
+                       new TestEntry(1007, ("A Teaspoon of Java 1.5"), ("Kevin 
Jones"), 77.77, 77),
+                       new TestEntry(1008, ("A Teaspoon of Java 1.6"), ("Kevin 
Jones"), 88.88, 88),
+                       new TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin 
Jones"), 99.99, 99),
+                       new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin 
Jones"), null, 1010)
+       };
 
-       public static final Object[][] testData = {
-                       {1001, ("Java public for dummies"), ("Tan Ah Teck"), 
11.11, 11},
-                       {1002, ("More Java for dummies"), ("Tan Ah Teck"), 
22.22, 22},
-                       {1003, ("More Java for more dummies"), ("Mohammad 
Ali"), 33.33, 33},
-                       {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-                       {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 
55},
-                       {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 
66.66, 66},
-                       {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 
77.77, 77},
-                       {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 
88.88, 88},
-                       {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 
99.99, 99},
-                       {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 
null, 1010}};
+       protected static class TestEntry {
+               protected final Integer id;
+               protected final String title;
+               protected final String author;
+               protected final Double price;
+               protected final Integer qty;
+               
+               private TestEntry(Integer id, String title, String author, 
Double price, Integer qty) {
+                       this.id = id;
+                       this.title = title;
+                       this.author = author;
+                       this.price = price;
+                       this.qty = qty;
+               }
+       }
 
-       public static final TypeInformation<?>[] fieldTypes = new 
TypeInformation<?>[] {
+       public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.DOUBLE_TYPE_INFO,
-               BasicTypeInfo.INT_TYPE_INFO
-       };
-       
-       public static final RowTypeInfo rowTypeInfo = new 
RowTypeInfo(fieldTypes);
+               BasicTypeInfo.INT_TYPE_INFO);
 
        public static String getCreateQuery(String tableName) {
                StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
");
@@ -84,14 +94,14 @@ public class JDBCTestBase {
        
        public static String getInsertQuery() {
                StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-               for (int i = 0; i < JDBCTestBase.testData.length; i++) {
+               for (int i = 0; i < TEST_DATA.length; i++) {
                        sqlQueryBuilder.append("(")
-                       .append(JDBCTestBase.testData[i][0]).append(",'")
-                       .append(JDBCTestBase.testData[i][1]).append("','")
-                       .append(JDBCTestBase.testData[i][2]).append("',")
-                       .append(JDBCTestBase.testData[i][3]).append(",")
-                       .append(JDBCTestBase.testData[i][4]).append(")");
-                       if (i < JDBCTestBase.testData.length - 1) {
+                       .append(TEST_DATA[i].id).append(",'")
+                       .append(TEST_DATA[i].title).append("','")
+                       .append(TEST_DATA[i].author).append("',")
+                       .append(TEST_DATA[i].price).append(",")
+                       .append(TEST_DATA[i].qty).append(")");
+                       if (i < TEST_DATA.length - 1) {
                                sqlQueryBuilder.append(",");
                        }
                }
@@ -105,79 +115,39 @@ public class JDBCTestBase {
                }
        };
 
-       public static void prepareTestDb() throws Exception {
-               System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-               Class.forName(DRIVER_CLASS);
-               Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
-
-               //create input table
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(getCreateQuery(INPUT_TABLE));
-               stat.close();
-
-               //create output table
-               stat = conn.createStatement();
-               stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
-               stat.close();
-
-               //prepare input data
-               stat = conn.createStatement();
-               stat.execute(JDBCTestBase.getInsertQuery());
-               stat.close();
-
-               conn.close();
-       }
-
        @BeforeClass
-       public static void setUpClass() throws SQLException {
-               try {
-                       System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-                       prepareDerbyDatabase();
-               } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
+       public static void prepareDerbyDatabase() throws Exception {
+               System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
 
-       private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
                Class.forName(DRIVER_CLASS);
-               conn = DriverManager.getConnection(DB_URL + ";create=true");
-               createTable(INPUT_TABLE);
-               createTable(OUTPUT_TABLE);
-               insertDataIntoInputTable();
-               conn.close();
+               try (Connection conn = DriverManager.getConnection(DB_URL + 
";create=true")) {
+                       createTable(conn, JDBCTestBase.INPUT_TABLE);
+                       createTable(conn, OUTPUT_TABLE);
+                       insertDataIntoInputTable(conn);
+               }
        }
        
-       private static void createTable(String tableName) throws SQLException {
+       private static void createTable(Connection conn, String tableName) 
throws SQLException {
                Statement stat = conn.createStatement();
                stat.executeUpdate(getCreateQuery(tableName));
                stat.close();
        }
        
-       private static void insertDataIntoInputTable() throws SQLException {
+       private static void insertDataIntoInputTable(Connection conn) throws 
SQLException {
                Statement stat = conn.createStatement();
-               stat.execute(JDBCTestBase.getInsertQuery());
+               stat.execute(getInsertQuery());
                stat.close();
        }
 
        @AfterClass
-       public static void tearDownClass() {
-               cleanUpDerbyDatabases();
-       }
+       public static void cleanUpDerbyDatabases() throws Exception {
+               Class.forName(DRIVER_CLASS);
+               try (
+                       Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
+                       Statement stat = conn.createStatement()) {
 
-       private static void cleanUpDerbyDatabases() {
-               try {
-                       Class.forName(DRIVER_CLASS);
-                       conn = DriverManager.getConnection(DB_URL + 
";create=true");
-                       Statement stat = conn.createStatement();
-                       stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
-                       stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
-                       stat.close();
-                       conn.close();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail();
+                       stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
+                       stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);       
                }
        }
-
 }

Reply via email to