http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java new file mode 100644 index 0000000..2ed2f8c --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; + +/** + * + * This splits generator actually does nothing but wrapping the query parameters + * computed by the user before creating the {@link JDBCInputFormat} instance. + * + * */ +public class GenericParameterValuesProvider implements ParameterValuesProvider { + + private final Serializable[][] parameters; + + public GenericParameterValuesProvider(Serializable[][] parameters) { + this.parameters = parameters; + } + + @Override + public Serializable[][] getParameterValues(){ + //do nothing...precomputed externally + return parameters; + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java new file mode 100644 index 0000000..ac56b98 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +/** + * + * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column. + * The generated query set will be of size equal to the configured fetchSize (apart the last one range), + * ranging from the min value up to the max. + * + * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like: + * <PRE> + * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ? + * </PRE> + * + * you can use this class to automatically generate the parameters of the BETWEEN clause, + * based on the passed constructor parameters. + * + * */ +public class NumericBetweenParametersProvider implements ParameterValuesProvider { + + private long fetchSize; + private final long min; + private final long max; + + public NumericBetweenParametersProvider(long fetchSize, long min, long max) { + this.fetchSize = fetchSize; + this.min = min; + this.max = max; + } + + @Override + public Serializable[][] getParameterValues(){ + double maxElemCount = (max - min) + 1; + int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue(); + Serializable[][] parameters = new Serializable[size][2]; + int count = 0; + for (long i = min; i < max; i += fetchSize, count++) { + long currentLimit = i + fetchSize - 1; + parameters[count] = new Long[]{i,currentLimit}; + if (currentLimit + 1 + fetchSize > max) { + parameters[count + 1] = new Long[]{currentLimit + 1, max}; + break; + } + } + return parameters; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java new file mode 100644 index 0000000..c194497 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; + +/** + * + * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). + * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation + * + * */ +public interface ParameterValuesProvider { + + /** Returns the necessary parameters array to use for query in parallel a table */ + public Serializable[][] getParameterValues(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java new file mode 100644 index 0000000..da9469b --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.table.Row; +import org.junit.Assert; +import org.junit.Test; + +public class JDBCFullTest extends JDBCTestBase { + + @Test + public void testJdbcInOut() throws Exception { + //run without parallelism + runTest(false); + + //cleanup + JDBCTestBase.tearDownClass(); + JDBCTestBase.prepareTestDb(); + + //run expliting parallelism + runTest(true); + + } + + private void runTest(boolean exploitParallelism) { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(JDBCTestBase.DRIVER_CLASS) + .setDBUrl(JDBCTestBase.DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo); + + if(exploitParallelism) { + final int fetchSize = 1; + final Long min = new Long(JDBCTestBase.testData[0][0].toString()); + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString()); + //use a "splittable" query to exploit parallelism + inputBuilder = inputBuilder + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max)); + } + DataSet<Row> source = environment.createInput(inputBuilder.finish()); + + //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but + //some database, doens't handle correctly null values when no column type specified + //in PreparedStatement.setObject (see its javadoc for more details) + source.output(JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(JDBCTestBase.DRIVER_CLASS) + .setDBUrl(JDBCTestBase.DB_URL) + .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") + .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER}) + .finish()); + try { + environment.execute(); + } catch (Exception e) { + Assert.fail("JDBC full test failed. " + e.getMessage()); + } + + try ( + Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); + PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); + ResultSet resultSet = statement.executeQuery() + ) { + int count = 0; + while (resultSet.next()) { + count++; + } + Assert.assertEquals(JDBCTestBase.testData.length, count); + } catch (SQLException e) { + Assert.fail("JDBC full test failed. " + e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java new file mode 100644 index 0000000..efae076 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.ResultSet; + +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.table.Row; +import org.apache.flink.core.io.InputSplit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class JDBCInputFormatTest extends JDBCTestBase { + + private JDBCInputFormat jdbcInputFormat; + + @After + public void tearDown() throws IOException { + if (jdbcInputFormat != null) { + jdbcInputFormat.close(); + } + jdbcInputFormat = null; + } + + @Test(expected = IllegalArgumentException.class) + public void testUntypedRowInfo() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .finish(); + jdbcInputFormat.openInputFormat(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDriver() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) + .finish(); + jdbcInputFormat.openInputFormat(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidURL() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) + .finish(); + jdbcInputFormat.openInputFormat(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidQuery() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery("iamnotsql") + .setRowTypeInfo(rowTypeInfo) + .finish(); + jdbcInputFormat.openInputFormat(); + } + + @Test(expected = IllegalArgumentException.class) + public void testIncompleteConfiguration() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) + .finish(); + } + + @Test + public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + //this query does not exploit parallelism + Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length); + jdbcInputFormat.openInputFormat(); + jdbcInputFormat.open(null); + Row row = new Row(5); + int recordCount = 0; + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData[recordCount][x]!=null) { + Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + } + } + recordCount++; + } + jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(testData.length, recordCount); + } + + @Test + public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final int fetchSize = 1; + final Long min = new Long(JDBCTestBase.testData[0][0] + ""); + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + ""); + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //this query exploit parallelism (1 split for every id) + Assert.assertEquals(testData.length, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits[i]); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData[recordCount][x]!=null) { + Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + } + } + recordCount++; + } + jdbcInputFormat.close(); + } + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(testData.length, recordCount); + } + + @Test + public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException { + Serializable[][] queryParameters = new String[2][1]; + queryParameters[0] = new String[]{"Kumar"}; + queryParameters[1] = new String[]{"Tan Ah Teck"}; + ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(paramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //this query exploit parallelism (1 split for every queryParameters row) + Assert.assertEquals(queryParameters.length, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits[i]); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + + recordCount++; + } + jdbcInputFormat.close(); + } + Assert.assertEquals(3, recordCount); + jdbcInputFormat.closeInputFormat(); + } + + @Test + public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_EMPTY) + .setRowTypeInfo(rowTypeInfo) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + jdbcInputFormat.openInputFormat(); + jdbcInputFormat.open(null); + Row row = new Row(5); + int recordsCnt = 0; + while (!jdbcInputFormat.reachedEnd()) { + Assert.assertNull(jdbcInputFormat.nextRecord(row)); + recordsCnt++; + } + jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(0, recordsCnt); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java new file mode 100644 index 0000000..086a84c --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class JDBCOutputFormatTest extends JDBCTestBase { + + private JDBCOutputFormat jdbcOutputFormat; + private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>(); + + @After + public void tearDown() throws IOException { + if (jdbcOutputFormat != null) { + jdbcOutputFormat.close(); + } + jdbcOutputFormat = null; + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDriver() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidURL() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidQuery() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery("iamnotsql") + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testIncompleteConfiguration() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) + .finish(); + } + + + @Test(expected = IllegalArgumentException.class) + public void testIncompatibleTypes() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) + .finish(); + jdbcOutputFormat.open(0, 1); + + tuple5.setField(4, 0); + tuple5.setField("hello", 1); + tuple5.setField("world", 2); + tuple5.setField(0.99, 3); + tuple5.setField("imthewrongtype", 4); + + Row row = new Row(tuple5.getArity()); + for (int i = 0; i < tuple5.getArity(); i++) { + row.setField(i, tuple5.getField(i)); + } + jdbcOutputFormat.writeRecord(row); + jdbcOutputFormat.close(); + } + + @Test + public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException { + + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE)) + .finish(); + jdbcOutputFormat.open(0, 1); + + for (int i = 0; i < testData.length; i++) { + Row row = new Row(testData[i].length); + for (int j = 0; j < testData[i].length; j++) { + row.setField(j, testData[i][j]); + } + jdbcOutputFormat.writeRecord(row); + } + + jdbcOutputFormat.close(); + + try ( + Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); + PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); + ResultSet resultSet = statement.executeQuery() + ) { + int recordCount = 0; + while (resultSet.next()) { + Row row = new Row(tuple5.getArity()); + for (int i = 0; i < tuple5.getArity(); i++) { + row.setField(i, resultSet.getObject(i + 1)); + } + if (row.productElement(0) != null) { + Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass()); + } + if (row.productElement(1) != null) { + Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass()); + } + if (row.productElement(2) != null) { + Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass()); + } + if (row.productElement(3) != null) { + Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass()); + } + if (row.productElement(4) != null) { + Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass()); + } + + for (int x = 0; x < tuple5.getArity(); x++) { + if (JDBCTestBase.testData[recordCount][x] != null) { + Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x)); + } + } + + recordCount++; + } + Assert.assertEquals(JDBCTestBase.testData.length, recordCount); + } catch (SQLException e) { + Assert.fail("JDBC OutputFormat test failed. " + e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java new file mode 100644 index 0000000..69ad693 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +/** + * Base test class for JDBC Input and Output formats + */ +public class JDBCTestBase { + + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String DB_URL = "jdbc:derby:memory:ebookshop"; + public static final String INPUT_TABLE = "books"; + public static final String OUTPUT_TABLE = "newbooks"; + public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; + public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; + public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; + public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; + + protected static Connection conn; + + public static final Object[][] testData = { + {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11}, + {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, + {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, + {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, + {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}, + {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66}, + {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77}, + {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88}, + {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99}, + {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}}; + + public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + }; + + public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); + + public static String getCreateQuery(String tableName) { + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); + sqlQueryBuilder.append(tableName).append(" ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + return sqlQueryBuilder.toString(); + } + + public static String getInsertQuery() { + StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + for (int i = 0; i < JDBCTestBase.testData.length; i++) { + sqlQueryBuilder.append("(") + .append(JDBCTestBase.testData[i][0]).append(",'") + .append(JDBCTestBase.testData[i][1]).append("','") + .append(JDBCTestBase.testData[i][2]).append("',") + .append(JDBCTestBase.testData[i][3]).append(",") + .append(JDBCTestBase.testData[i][4]).append(")"); + if (i < JDBCTestBase.testData.length - 1) { + sqlQueryBuilder.append(","); + } + } + String insertQuery = sqlQueryBuilder.toString(); + return insertQuery; + } + + public static final OutputStream DEV_NULL = new OutputStream() { + @Override + public void write(int b) { + } + }; + + public static void prepareTestDb() throws Exception { + System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); + Class.forName(DRIVER_CLASS); + Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); + + //create input table + Statement stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(INPUT_TABLE)); + stat.close(); + + //create output table + stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(OUTPUT_TABLE)); + stat.close(); + + //prepare input data + stat = conn.createStatement(); + stat.execute(JDBCTestBase.getInsertQuery()); + stat.close(); + + conn.close(); + } + + @BeforeClass + public static void setUpClass() throws SQLException { + try { + System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); + prepareDerbyDatabase(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { + Class.forName(DRIVER_CLASS); + conn = DriverManager.getConnection(DB_URL + ";create=true"); + createTable(INPUT_TABLE); + createTable(OUTPUT_TABLE); + insertDataIntoInputTable(); + conn.close(); + } + + private static void createTable(String tableName) throws SQLException { + Statement stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(tableName)); + stat.close(); + } + + private static void insertDataIntoInputTable() throws SQLException { + Statement stat = conn.createStatement(); + stat.execute(JDBCTestBase.getInsertQuery()); + stat.close(); + } + + @AfterClass + public static void tearDownClass() { + cleanUpDerbyDatabases(); + } + + private static void cleanUpDerbyDatabases() { + try { + Class.forName(DRIVER_CLASS); + conn = DriverManager.getConnection(DB_URL + ";create=true"); + Statement stat = conn.createStatement(); + stat.executeUpdate("DROP TABLE "+INPUT_TABLE); + stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE); + stat.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2fb9345 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties @@ -0,0 +1,19 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml new file mode 100644 index 0000000..dcb33eb --- /dev/null +++ b/flink-connectors/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>flink-connectors</artifactId> + <name>flink-connectors</name> + <packaging>pom</packaging> + + <modules> + <module>flink-avro</module> + <module>flink-jdbc</module> + <module>flink-hadoop-compatibility</module> + <module>flink-hbase</module> + <module>flink-hcatalog</module> + <module>flink-connector-flume</module> + <module>flink-connector-kafka-base</module> + <module>flink-connector-kafka-0.8</module> + <module>flink-connector-kafka-0.9</module> + <module>flink-connector-kafka-0.10</module> + <module>flink-connector-elasticsearch</module> + <module>flink-connector-elasticsearch2</module> + <module>flink-connector-rabbitmq</module> + <module>flink-connector-twitter</module> + <module>flink-connector-nifi</module> + <module>flink-connector-cassandra</module> + <module>flink-connector-redis</module> + <module>flink-connector-filesystem</module> + </modules> + + <!-- See main pom.xml for explanation of profiles --> + <profiles> + <!-- + We include the kinesis module only optionally because it contains a dependency + licenced under the "Amazon Software License". + In accordance with the discussion in https://issues.apache.org/jira/browse/LEGAL-198 + this is an optional module for Flink. + --> + <profile> + <id>include-kinesis</id> + <modules> + <module>flink-connector-kinesis</module> + </modules> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml deleted file mode 100644 index 3a1731c..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml +++ /dev/null @@ -1,179 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-cassandra_2.10</artifactId> - <name>flink-connector-cassandra</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <cassandra.version>2.2.5</cassandra.version> - <driver.version>3.0.0</driver.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <reuseForks>true</reuseForks> - <forkCount>1</forkCount> - <argLine>-Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.1</version> - <executions> - <!-- Run shade goal on package phase --> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration combine.self="override"> - <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation> - <artifactSet> - <includes> - <include>com.datastax.cassandra:cassandra-driver-core</include> - <include>com.datastax.cassandra:cassandra-driver-mapping</include> - <include>com.google.guava:guava</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google</pattern> - <shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern> - <excludes> - <exclude>com.google.protobuf.**</exclude> - <exclude>com.google.inject.**</exclude> - </excludes> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.datastax.cassandra</groupId> - <artifactId>cassandra-driver-core</artifactId> - <version>${driver.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.datastax.cassandra</groupId> - <artifactId>cassandra-driver-mapping</artifactId> - <version>${driver.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-all</artifactId> - <version>${cassandra.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java deleted file mode 100644 index 849e023..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.batch.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.google.common.base.Strings; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.NonParallelInput; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}. - * - * @param <OUT> type of Tuple - */ -public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput { - private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); - - private final String query; - private final ClusterBuilder builder; - - private transient Cluster cluster; - private transient Session session; - private transient ResultSet resultSet; - - public CassandraInputFormat(String query, ClusterBuilder builder) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); - Preconditions.checkArgument(builder != null, "Builder cannot be null"); - - this.query = query; - this.builder = builder; - } - - @Override - public void configure(Configuration parameters) { - this.cluster = builder.getCluster(); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - /** - * Opens a Session and executes the query. - * - * @param ignored - * @throws IOException - */ - @Override - public void open(InputSplit ignored) throws IOException { - this.session = cluster.connect(); - this.resultSet = session.execute(query); - } - - @Override - public boolean reachedEnd() throws IOException { - return resultSet.isExhausted(); - } - - @Override - public OUT nextRecord(OUT reuse) throws IOException { - final Row item = resultSet.one(); - for (int i = 0; i < reuse.getArity(); i++) { - reuse.setField(item.getObject(i), i); - } - return reuse; - } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; - return split; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - /** - * Closes all resources used. - */ - @Override - public void close() throws IOException { - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - - try { - if (cluster != null ) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java deleted file mode 100644 index 15d8fb3..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.batch.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; -import com.google.common.base.Strings; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import org.apache.flink.api.common.io.RichOutputFormat; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. - * - * @param <OUT> type of Tuple - */ -public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> { - private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class); - - private final String insertQuery; - private final ClusterBuilder builder; - - private transient Cluster cluster; - private transient Session session; - private transient PreparedStatement prepared; - private transient FutureCallback<ResultSet> callback; - private transient Throwable exception = null; - - public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty"); - Preconditions.checkArgument(builder != null, "Builder cannot be null"); - - this.insertQuery = insertQuery; - this.builder = builder; - } - - @Override - public void configure(Configuration parameters) { - this.cluster = builder.getCluster(); - } - - /** - * Opens a Session to Cassandra and initializes the prepared statement. - * - * @param taskNumber The number of the parallel instance. - * @throws IOException Thrown, if the output could not be opened due to an - * I/O problem. - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - this.session = cluster.connect(); - this.prepared = session.prepare(insertQuery); - this.callback = new FutureCallback<ResultSet>() { - @Override - public void onSuccess(ResultSet ignored) { - } - - @Override - public void onFailure(Throwable t) { - exception = t; - } - }; - } - - @Override - public void writeRecord(OUT record) throws IOException { - if (exception != null) { - throw new IOException("write record failed", exception); - } - - Object[] fields = new Object[record.getArity()]; - for (int i = 0; i < record.getArity(); i++) { - fields[i] = record.getField(i); - } - ResultSetFuture result = session.executeAsync(prepared.bind(fields)); - Futures.addCallback(result, callback); - } - - /** - * Closes all resources used. - */ - @Override - public void close() throws IOException { - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - - try { - if (cluster != null ) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java deleted file mode 100644 index 63b76da..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra - * database. - * - * <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| - */ -public class CassandraCommitter extends CheckpointCommitter { - - private static final long serialVersionUID = 1L; - - private final ClusterBuilder builder; - private transient Cluster cluster; - private transient Session session; - - private String keySpace = "flink_auxiliary"; - private String table = "checkpoints_"; - - /** - * A cache of the last committed checkpoint ids per subtask index. This is used to - * avoid redundant round-trips to Cassandra (see {@link #isCheckpointCommitted(int, long)}. - */ - private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<>(); - - public CassandraCommitter(ClusterBuilder builder) { - this.builder = builder; - ClosureCleaner.clean(builder, true); - } - - public CassandraCommitter(ClusterBuilder builder, String keySpace) { - this(builder); - this.keySpace = keySpace; - } - - /** - * Internally used to set the job ID after instantiation. - */ - public void setJobId(String id) throws Exception { - super.setJobId(id); - table += id; - } - - /** - * Generates the necessary tables to store information. - * - * @throws Exception - */ - @Override - public void createResource() throws Exception { - cluster = builder.getCluster(); - session = cluster.connect(); - - session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", keySpace)); - session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); - - try { - session.close(); - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - try { - cluster.close(); - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } - - @Override - public void open() throws Exception { - if (builder == null) { - throw new RuntimeException("No ClusterBuilder was set."); - } - cluster = builder.getCluster(); - session = cluster.connect(); - } - - @Override - public void close() throws Exception { - this.lastCommittedCheckpoints.clear(); - try { - session.close(); - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - try { - cluster.close(); - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } - - @Override - public void commitCheckpoint(int subtaskIdx, long checkpointId) { - String statement = String.format( - "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;", - keySpace, table, checkpointId, operatorId, subtaskIdx); - - session.execute(statement); - lastCommittedCheckpoints.put(subtaskIdx, checkpointId); - } - - @Override - public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) { - // Pending checkpointed buffers are committed in ascending order of their - // checkpoint id. This way we can tell if a checkpointed buffer was committed - // just by asking the third-party storage system for the last checkpoint id - // committed by the specified subtask. - - Long lastCommittedCheckpoint = lastCommittedCheckpoints.get(subtaskIdx); - if (lastCommittedCheckpoint == null) { - String statement = String.format( - "SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", - keySpace, table, operatorId, subtaskIdx); - - Iterator<Row> resultIt = session.execute(statement).iterator(); - if (resultIt.hasNext()) { - lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id"); - lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint); - } - } - return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java deleted file mode 100644 index 650c481..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.mapping.Mapper; -import com.datastax.driver.mapping.MappingManager; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.flink.configuration.Configuration; - -/** - * Flink Sink to save data into a Cassandra cluster using - * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>, - * which it uses annotations from - * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html"> - * com.datastax.driver.mapping.annotations</a>. - * - * @param <IN> Type of the elements emitted by this sink - */ -public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> { - - private static final long serialVersionUID = 1L; - - protected final Class<IN> clazz; - protected transient Mapper<IN> mapper; - protected transient MappingManager mappingManager; - - /** - * The main constructor for creating CassandraPojoSink - * - * @param clazz Class<IN> instance - */ - public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) { - super(builder); - this.clazz = clazz; - } - - @Override - public void open(Configuration configuration) { - super.open(configuration); - try { - this.mappingManager = new MappingManager(session); - this.mapper = mappingManager.mapper(clazz); - } catch (Exception e) { - throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e); - } - } - - @Override - public ListenableFuture<Void> send(IN value) { - return mapper.saveAsync(value); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java deleted file mode 100644 index 180b638..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.transformations.SinkTransformation; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; - -/** - * This class wraps different Cassandra sink implementations to provide a common interface for all of them. - * - * @param <IN> input type - */ -public class CassandraSink<IN> { - private final boolean useDataStreamSink; - private DataStreamSink<IN> sink1; - private SingleOutputStreamOperator<IN> sink2; - - private CassandraSink(DataStreamSink<IN> sink) { - sink1 = sink; - useDataStreamSink = true; - } - - private CassandraSink(SingleOutputStreamOperator<IN> sink) { - sink2 = sink; - useDataStreamSink = false; - } - - private SinkTransformation<IN> getSinkTransformation() { - return sink1.getTransformation(); - } - - private StreamTransformation<IN> getStreamTransformation() { - return sink2.getTransformation(); - } - - /** - * Sets the name of this sink. This name is - * used by the visualization and logging during runtime. - * - * @return The named sink. - */ - public CassandraSink<IN> name(String name) { - if (useDataStreamSink) { - getSinkTransformation().setName(name); - } else { - getStreamTransformation().setName(name); - } - return this; - } - - /** - * Sets an ID for this operator. - * <p/> - * <p>The specified ID is used to assign the same operator ID across job - * submissions (for example when starting a job from a savepoint). - * <p/> - * <p><strong>Important</strong>: this ID needs to be unique per - * transformation and job. Otherwise, job submission will fail. - * - * @param uid The unique user-specified ID of this transformation. - * @return The operator with the specified ID. - */ - public CassandraSink<IN> uid(String uid) { - if (useDataStreamSink) { - getSinkTransformation().setUid(uid); - } else { - getStreamTransformation().setUid(uid); - } - return this; - } - - /** - * Sets the parallelism for this sink. The degree must be higher than zero. - * - * @param parallelism The parallelism for this sink. - * @return The sink with set parallelism. - */ - public CassandraSink<IN> setParallelism(int parallelism) { - if (useDataStreamSink) { - getSinkTransformation().setParallelism(parallelism); - } else { - getStreamTransformation().setParallelism(parallelism); - } - return this; - } - - /** - * Turns off chaining for this operator so thread co-location will not be - * used as an optimization. - * <p/> - * <p/> - * Chaining can be turned off for the whole - * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()} - * however it is not advised for performance considerations. - * - * @return The sink with chaining disabled - */ - public CassandraSink<IN> disableChaining() { - if (useDataStreamSink) { - getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER); - } else { - getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER); - } - return this; - } - - /** - * Sets the slot sharing group of this operation. Parallel instances of - * operations that are in the same slot sharing group will be co-located in the same - * TaskManager slot, if possible. - * <p/> - * <p>Operations inherit the slot sharing group of input operations if all input operations - * are in the same slot sharing group and no slot sharing group was explicitly specified. - * <p/> - * <p>Initially an operation is in the default slot sharing group. An operation can be put into - * the default group explicitly by setting the slot sharing group to {@code "default"}. - * - * @param slotSharingGroup The slot sharing group name. - */ - public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) { - if (useDataStreamSink) { - getSinkTransformation().setSlotSharingGroup(slotSharingGroup); - } else { - getStreamTransformation().setSlotSharingGroup(slotSharingGroup); - } - return this; - } - - /** - * Writes a DataStream into a Cassandra database. - * - * @param input input DataStream - * @param <IN> input type - * @return CassandraSinkBuilder, to further configure the sink - */ - public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { - if (input.getType() instanceof TupleTypeInfo) { - DataStream<T> tupleInput = (DataStream<T>) input; - return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); - } else { - return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig())); - } - } - - public abstract static class CassandraSinkBuilder<IN> { - protected final DataStream<IN> input; - protected final TypeSerializer<IN> serializer; - protected final TypeInformation<IN> typeInfo; - protected ClusterBuilder builder; - protected String query; - protected CheckpointCommitter committer; - protected boolean isWriteAheadLogEnabled; - - public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { - this.input = input; - this.typeInfo = typeInfo; - this.serializer = serializer; - } - - /** - * Sets the query that is to be executed for every record. - * - * @param query query to use - * @return this builder - */ - public CassandraSinkBuilder<IN> setQuery(String query) { - this.query = query; - return this; - } - - /** - * Sets the cassandra host to connect to. - * - * @param host host to connect to - * @return this builder - */ - public CassandraSinkBuilder<IN> setHost(String host) { - return setHost(host, 9042); - } - - /** - * Sets the cassandra host/port to connect to. - * - * @param host host to connect to - * @param port port to connect to - * @return this builder - */ - public CassandraSinkBuilder<IN> setHost(final String host, final int port) { - if (this.builder != null) { - throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder()."); - } - this.builder = new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint(host).withPort(port).build(); - } - }; - return this; - } - - /** - * Sets the ClusterBuilder for this sink. A ClusterBuilder is used to configure the connection to cassandra. - * - * @param builder ClusterBuilder to configure the connection to cassandra - * @return this builder - */ - public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) { - if (this.builder != null) { - throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder()."); - } - this.builder = builder; - return this; - } - - /** - * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use - * idempotent updates. - * - * @return this builder - */ - public CassandraSinkBuilder<IN> enableWriteAheadLog() { - this.isWriteAheadLogEnabled = true; - return this; - } - - /** - * Enables the write-ahead log, which allows exactly-once processing for non-deterministic algorithms that use - * idempotent updates. - * - * @param committer CheckpointCommitter, that stores informationa bout completed checkpoints in an external - * resource. By default this information is stored within a separate table within Cassandra. - * @return this builder - */ - public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) { - this.isWriteAheadLogEnabled = true; - this.committer = committer; - return this; - } - - /** - * Finalizes the configuration of this sink. - * - * @return finalized sink - * @throws Exception - */ - public abstract CassandraSink<IN> build() throws Exception; - - protected void sanityCheck() { - if (builder == null) { - throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder()."); - } - } - } - - public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> { - public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { - super(input, typeInfo, serializer); - } - - @Override - protected void sanityCheck() { - super.sanityCheck(); - if (query == null || query.length() == 0) { - throw new IllegalArgumentException("Query must not be null or empty."); - } - } - - @Override - public CassandraSink<IN> build() throws Exception { - sanityCheck(); - if (isWriteAheadLogEnabled) { - return committer == null - ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, new CassandraCommitter(builder)))) - : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink<>(query, serializer, builder, committer))); - } else { - return new CassandraSink<>(input.addSink(new CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink")); - } - } - } - - public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> { - public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) { - super(input, typeInfo, serializer); - } - - @Override - protected void sanityCheck() { - super.sanityCheck(); - if (query != null) { - throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input."); - } - } - - @Override - public CassandraSink<IN> build() throws Exception { - sanityCheck(); - if (isWriteAheadLogEnabled) { - throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); - } else { - return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink")); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java deleted file mode 100644 index 49b1efa..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. - * - * @param <IN> Type of the elements emitted by this sink - */ -public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { - protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class); - protected transient Cluster cluster; - protected transient Session session; - - protected transient Throwable exception = null; - protected transient FutureCallback<V> callback; - - private final ClusterBuilder builder; - - protected CassandraSinkBase(ClusterBuilder builder) { - this.builder = builder; - ClosureCleaner.clean(builder, true); - } - - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback<V>() { - @Override - public void onSuccess(V ignored) { - } - - @Override - public void onFailure(Throwable t) { - exception = t; - LOG.error("Error while sending value.", t); - } - }; - this.cluster = builder.getCluster(); - this.session = cluster.connect(); - } - - @Override - public void invoke(IN value) throws Exception { - if (exception != null) { - throw new IOException("invoke() failed", exception); - } - ListenableFuture<V> result = send(value); - Futures.addCallback(result, callback); - } - - public abstract ListenableFuture<V> send(IN value); - - @Override - public void close() { - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - try { - if (cluster != null) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java deleted file mode 100644 index 0a9ef06..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; - -/** - * Flink Sink to save data into a Cassandra cluster. - * - * @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple} - */ -public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> { - private final String insertQuery; - private transient PreparedStatement ps; - - public CassandraTupleSink(String insertQuery, ClusterBuilder builder) { - super(builder); - this.insertQuery = insertQuery; - } - - @Override - public void open(Configuration configuration) { - super.open(configuration); - this.ps = session.prepare(insertQuery); - } - - @Override - public ListenableFuture<ResultSet> send(IN value) { - Object[] fields = extract(value); - return session.executeAsync(ps.bind(fields)); - } - - private Object[] extract(IN record) { - Object[] al = new Object[record.getArity()]; - for (int i = 0; i < record.getArity(); i++) { - al[i] = record.getField(i); - } - return al; - } -}
