http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java deleted file mode 100644 index b4246f5..0000000 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Array; -import java.sql.Connection; -import java.sql.Date; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.util.Arrays; - -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * InputFormat to read data from a database and generate Rows. - * The InputFormat has to be configured using the supplied InputFormatBuilder. - * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br> - * - * <pre><code> - * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { - * BasicTypeInfo.INT_TYPE_INFO, - * BasicTypeInfo.STRING_TYPE_INFO, - * BasicTypeInfo.STRING_TYPE_INFO, - * BasicTypeInfo.DOUBLE_TYPE_INFO, - * BasicTypeInfo.INT_TYPE_INFO - * }; - * - * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); - * - * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - * .setDBUrl("jdbc:derby:memory:ebookshop") - * .setQuery("select * from books") - * .setRowTypeInfo(rowTypeInfo) - * .finish(); - * </code></pre> - * - * In order to query the JDBC source in parallel, you need to provide a - * parameterized query template (i.e. a valid {@link PreparedStatement}) and - * a {@link ParameterValuesProvider} which provides binding values for the - * query parameters. E.g.:</br> - * - * <pre><code> - * - * Serializable[][] queryParameters = new String[2][1]; - * queryParameters[0] = new String[]{"Kumar"}; - * queryParameters[1] = new String[]{"Tan Ah Teck"}; - * - * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - * .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - * .setDBUrl("jdbc:derby:memory:ebookshop") - * .setQuery("select * from books WHERE author = ?") - * .setRowTypeInfo(rowTypeInfo) - * .setParametersProvider(new GenericParameterValuesProvider(queryParameters)) - * .finish(); - * </code></pre> - * - * @see Row - * @see ParameterValuesProvider - * @see PreparedStatement - * @see DriverManager - */ -public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); - - private String username; - private String password; - private String drivername; - private String dbURL; - private String queryTemplate; - private int resultSetType; - private int resultSetConcurrency; - private RowTypeInfo rowTypeInfo; - - private transient Connection dbConn; - private transient PreparedStatement statement; - private transient ResultSet resultSet; - - private boolean hasNext; - private Object[][] parameterValues; - - public JDBCInputFormat() { - } - - @Override - public RowTypeInfo getProducedType() { - return rowTypeInfo; - } - - @Override - public void configure(Configuration parameters) { - //do nothing here - } - - @Override - public void openInputFormat() { - //called once per inputFormat (on open) - try { - Class.forName(drivername); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); - } catch (SQLException se) { - throw new IllegalArgumentException("open() failed." + se.getMessage(), se); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); - } - } - - @Override - public void closeInputFormat() { - //called once per inputFormat (on close) - try { - if(statement != null) { - statement.close(); - } - } catch (SQLException se) { - LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); - } finally { - statement = null; - } - - try { - if(dbConn != null) { - dbConn.close(); - } - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } finally { - dbConn = null; - } - - parameterValues = null; - } - - /** - * Connects to the source database and executes the query in a <b>parallel - * fashion</b> if - * this {@link InputFormat} is built using a parameterized query (i.e. using - * a {@link PreparedStatement}) - * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel - * fashion</b> otherwise. - * - * @param inputSplit which is ignored if this InputFormat is executed as a - * non-parallel source, - * a "hook" to the query parameters otherwise (using its - * <i>splitNumber</i>) - * @throws IOException if there's an error during the execution of the query - */ - @Override - public void open(InputSplit inputSplit) throws IOException { - try { - if (inputSplit != null && parameterValues != null) { - for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { - Object param = parameterValues[inputSplit.getSplitNumber()][i]; - if (param instanceof String) { - statement.setString(i + 1, (String) param); - } else if (param instanceof Long) { - statement.setLong(i + 1, (Long) param); - } else if (param instanceof Integer) { - statement.setInt(i + 1, (Integer) param); - } else if (param instanceof Double) { - statement.setDouble(i + 1, (Double) param); - } else if (param instanceof Boolean) { - statement.setBoolean(i + 1, (Boolean) param); - } else if (param instanceof Float) { - statement.setFloat(i + 1, (Float) param); - } else if (param instanceof BigDecimal) { - statement.setBigDecimal(i + 1, (BigDecimal) param); - } else if (param instanceof Byte) { - statement.setByte(i + 1, (Byte) param); - } else if (param instanceof Short) { - statement.setShort(i + 1, (Short) param); - } else if (param instanceof Date) { - statement.setDate(i + 1, (Date) param); - } else if (param instanceof Time) { - statement.setTime(i + 1, (Time) param); - } else if (param instanceof Timestamp) { - statement.setTimestamp(i + 1, (Timestamp) param); - } else if (param instanceof Array) { - statement.setArray(i + 1, (Array) param); - } else { - //extends with other types if needed - throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." ); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); - } - } - resultSet = statement.executeQuery(); - hasNext = resultSet.next(); - } catch (SQLException se) { - throw new IllegalArgumentException("open() failed." + se.getMessage(), se); - } - } - - /** - * Closes all resources used. - * - * @throws IOException Indicates that a resource could not be closed. - */ - @Override - public void close() throws IOException { - if(resultSet == null) { - return; - } - try { - resultSet.close(); - } catch (SQLException se) { - LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage()); - } - } - - /** - * Checks whether all data has been read. - * - * @return boolean value indication whether all data has been read. - * @throws IOException - */ - @Override - public boolean reachedEnd() throws IOException { - return !hasNext; - } - - /** - * Stores the next resultSet row in a tuple - * - * @param row row to be reused. - * @return row containing next {@link Row} - * @throws java.io.IOException - */ - @Override - public Row nextRecord(Row row) throws IOException { - try { - if (!hasNext) { - return null; - } - for (int pos = 0; pos < row.productArity(); pos++) { - row.setField(pos, resultSet.getObject(pos + 1)); - } - //update hasNext after we've read the record - hasNext = resultSet.next(); - return row; - } catch (SQLException se) { - throw new IOException("Couldn't read data - " + se.getMessage(), se); - } catch (NullPointerException npe) { - throw new IOException("Couldn't access resultSet", npe); - } - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - if (parameterValues == null) { - return new GenericInputSplit[]{new GenericInputSplit(0, 1)}; - } - GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length]; - for (int i = 0; i < ret.length; i++) { - ret[i] = new GenericInputSplit(i, ret.length); - } - return ret; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - * @return builder - */ - public static JDBCInputFormatBuilder buildJDBCInputFormat() { - return new JDBCInputFormatBuilder(); - } - - public static class JDBCInputFormatBuilder { - private final JDBCInputFormat format; - - public JDBCInputFormatBuilder() { - this.format = new JDBCInputFormat(); - //using TYPE_FORWARD_ONLY for high performance reads - this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY; - this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; - } - - public JDBCInputFormatBuilder setUsername(String username) { - format.username = username; - return this; - } - - public JDBCInputFormatBuilder setPassword(String password) { - format.password = password; - return this; - } - - public JDBCInputFormatBuilder setDrivername(String drivername) { - format.drivername = drivername; - return this; - } - - public JDBCInputFormatBuilder setDBUrl(String dbURL) { - format.dbURL = dbURL; - return this; - } - - public JDBCInputFormatBuilder setQuery(String query) { - format.queryTemplate = query; - return this; - } - - public JDBCInputFormatBuilder setResultSetType(int resultSetType) { - format.resultSetType = resultSetType; - return this; - } - - public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) { - format.resultSetConcurrency = resultSetConcurrency; - return this; - } - - public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) { - format.parameterValues = parameterValuesProvider.getParameterValues(); - return this; - } - - public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) { - format.rowTypeInfo = rowTypeInfo; - return this; - } - - public JDBCInputFormat finish() { - if (format.username == null) { - LOG.info("Username was not supplied separately."); - } - if (format.password == null) { - LOG.info("Password was not supplied separately."); - } - if (format.dbURL == null) { - throw new IllegalArgumentException("No database URL supplied"); - } - if (format.queryTemplate == null) { - throw new IllegalArgumentException("No query supplied"); - } - if (format.drivername == null) { - throw new IllegalArgumentException("No driver supplied"); - } - if (format.rowTypeInfo == null) { - throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied"); - } - if (format.parameterValues == null) { - LOG.debug("No input splitting configured (data will be read with parallelism 1)."); - } - return format; - } - - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java deleted file mode 100644 index da4b1ad..0000000 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import org.apache.flink.api.common.io.RichOutputFormat; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.table.Row; -import org.apache.flink.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * OutputFormat to write tuples into a database. - * The OutputFormat has to be configured using the supplied OutputFormatBuilder. - * - * @see Tuple - * @see DriverManager - */ -public class JDBCOutputFormat extends RichOutputFormat<Row> { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); - - private String username; - private String password; - private String drivername; - private String dbURL; - private String query; - private int batchInterval = 5000; - - private Connection dbConn; - private PreparedStatement upload; - - private int batchCount = 0; - - public int[] typesArray; - - public JDBCOutputFormat() { - } - - @Override - public void configure(Configuration parameters) { - } - - /** - * Connects to the target database and initializes the prepared statement. - * - * @param taskNumber The number of the parallel instance. - * @throws IOException Thrown, if the output could not be opened due to an - * I/O problem. - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - establishConnection(); - upload = dbConn.prepareStatement(query); - } catch (SQLException sqe) { - throw new IllegalArgumentException("open() failed.", sqe); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("JDBC driver class not found.", cnfe); - } - } - - private void establishConnection() throws SQLException, ClassNotFoundException { - Class.forName(drivername); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); - } - } - - /** - * Adds a record to the prepared statement. - * <p> - * When this method is called, the output format is guaranteed to be opened. - * </p> - * - * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to - * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) - * - * @param row The records to add to the output. - * @see PreparedStatement - * @throws IOException Thrown, if the records could not be added due to an I/O problem. - */ - @Override - public void writeRecord(Row row) throws IOException { - - if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) { - LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); - } - try { - - if (typesArray == null ) { - // no types provided - for (int index = 0; index < row.productArity(); index++) { - LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index)); - upload.setObject(index + 1, row.productElement(index)); - } - } else { - // types provided - for (int index = 0; index < row.productArity(); index++) { - - if (row.productElement(index) == null) { - upload.setNull(index + 1, typesArray[index]); - } else { - // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html - switch (typesArray[index]) { - case java.sql.Types.NULL: - upload.setNull(index + 1, typesArray[index]); - break; - case java.sql.Types.BOOLEAN: - case java.sql.Types.BIT: - upload.setBoolean(index + 1, (boolean) row.productElement(index)); - break; - case java.sql.Types.CHAR: - case java.sql.Types.NCHAR: - case java.sql.Types.VARCHAR: - case java.sql.Types.LONGVARCHAR: - case java.sql.Types.LONGNVARCHAR: - upload.setString(index + 1, (String) row.productElement(index)); - break; - case java.sql.Types.TINYINT: - upload.setByte(index + 1, (byte) row.productElement(index)); - break; - case java.sql.Types.SMALLINT: - upload.setShort(index + 1, (short) row.productElement(index)); - break; - case java.sql.Types.INTEGER: - upload.setInt(index + 1, (int) row.productElement(index)); - break; - case java.sql.Types.BIGINT: - upload.setLong(index + 1, (long) row.productElement(index)); - break; - case java.sql.Types.REAL: - upload.setFloat(index + 1, (float) row.productElement(index)); - break; - case java.sql.Types.FLOAT: - case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) row.productElement(index)); - break; - case java.sql.Types.DECIMAL: - case java.sql.Types.NUMERIC: - upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index)); - break; - case java.sql.Types.DATE: - upload.setDate(index + 1, (java.sql.Date) row.productElement(index)); - break; - case java.sql.Types.TIME: - upload.setTime(index + 1, (java.sql.Time) row.productElement(index)); - break; - case java.sql.Types.TIMESTAMP: - upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index)); - break; - case java.sql.Types.BINARY: - case java.sql.Types.VARBINARY: - case java.sql.Types.LONGVARBINARY: - upload.setBytes(index + 1, (byte[]) row.productElement(index)); - break; - default: - upload.setObject(index + 1, row.productElement(index)); - LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.", - typesArray[index], index + 1, row.productElement(index)); - // case java.sql.Types.SQLXML - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUC - } - } - } - } - upload.addBatch(); - batchCount++; - if (batchCount >= batchInterval) { - upload.executeBatch(); - batchCount = 0; - } - } catch (SQLException | IllegalArgumentException e) { - throw new IllegalArgumentException("writeRecord() failed", e); - } - } - - /** - * Executes prepared statement and closes all resources of this instance. - * - * @throws IOException Thrown, if the input could not be closed properly. - */ - @Override - public void close() throws IOException { - try { - if (upload != null) { - upload.executeBatch(); - upload.close(); - } - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } finally { - upload = null; - batchCount = 0; - } - - try { - if (dbConn != null) { - dbConn.close(); - } - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } finally { - dbConn = null; - } - } - - public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { - return new JDBCOutputFormatBuilder(); - } - - public static class JDBCOutputFormatBuilder { - private final JDBCOutputFormat format; - - protected JDBCOutputFormatBuilder() { - this.format = new JDBCOutputFormat(); - } - - public JDBCOutputFormatBuilder setUsername(String username) { - format.username = username; - return this; - } - - public JDBCOutputFormatBuilder setPassword(String password) { - format.password = password; - return this; - } - - public JDBCOutputFormatBuilder setDrivername(String drivername) { - format.drivername = drivername; - return this; - } - - public JDBCOutputFormatBuilder setDBUrl(String dbURL) { - format.dbURL = dbURL; - return this; - } - - public JDBCOutputFormatBuilder setQuery(String query) { - format.query = query; - return this; - } - - public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { - format.batchInterval = batchInterval; - return this; - } - - public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { - format.typesArray = typesArray; - return this; - } - - /** - * Finalizes the configuration and checks validity. - * - * @return Configured JDBCOutputFormat - */ - public JDBCOutputFormat finish() { - if (format.username == null) { - LOG.info("Username was not supplied separately."); - } - if (format.password == null) { - LOG.info("Password was not supplied separately."); - } - if (format.dbURL == null) { - throw new IllegalArgumentException("No dababase URL supplied."); - } - if (format.query == null) { - throw new IllegalArgumentException("No query suplied"); - } - if (format.drivername == null) { - throw new IllegalArgumentException("No driver supplied"); - } - - return format; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java deleted file mode 100644 index 2ed2f8c..0000000 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io.jdbc.split; - -import java.io.Serializable; - -import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; - -/** - * - * This splits generator actually does nothing but wrapping the query parameters - * computed by the user before creating the {@link JDBCInputFormat} instance. - * - * */ -public class GenericParameterValuesProvider implements ParameterValuesProvider { - - private final Serializable[][] parameters; - - public GenericParameterValuesProvider(Serializable[][] parameters) { - this.parameters = parameters; - } - - @Override - public Serializable[][] getParameterValues(){ - //do nothing...precomputed externally - return parameters; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java deleted file mode 100644 index ac56b98..0000000 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io.jdbc.split; - -import java.io.Serializable; - -/** - * - * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column. - * The generated query set will be of size equal to the configured fetchSize (apart the last one range), - * ranging from the min value up to the max. - * - * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like: - * <PRE> - * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ? - * </PRE> - * - * you can use this class to automatically generate the parameters of the BETWEEN clause, - * based on the passed constructor parameters. - * - * */ -public class NumericBetweenParametersProvider implements ParameterValuesProvider { - - private long fetchSize; - private final long min; - private final long max; - - public NumericBetweenParametersProvider(long fetchSize, long min, long max) { - this.fetchSize = fetchSize; - this.min = min; - this.max = max; - } - - @Override - public Serializable[][] getParameterValues(){ - double maxElemCount = (max - min) + 1; - int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue(); - Serializable[][] parameters = new Serializable[size][2]; - int count = 0; - for (long i = min; i < max; i += fetchSize, count++) { - long currentLimit = i + fetchSize - 1; - parameters[count] = new Long[]{i,currentLimit}; - if (currentLimit + 1 + fetchSize > max) { - parameters[count + 1] = new Long[]{currentLimit + 1, max}; - break; - } - } - return parameters; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java deleted file mode 100644 index c194497..0000000 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io.jdbc.split; - -import java.io.Serializable; - -import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; - -/** - * - * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). - * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation - * - * */ -public interface ParameterValuesProvider { - - /** Returns the necessary parameters array to use for query in parallel a table */ - public Serializable[][] getParameterValues(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java deleted file mode 100644 index da9469b..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Types; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; -import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; -import org.apache.flink.api.table.Row; -import org.junit.Assert; -import org.junit.Test; - -public class JDBCFullTest extends JDBCTestBase { - - @Test - public void testJdbcInOut() throws Exception { - //run without parallelism - runTest(false); - - //cleanup - JDBCTestBase.tearDownClass(); - JDBCTestBase.prepareTestDb(); - - //run expliting parallelism - runTest(true); - - } - - private void runTest(boolean exploitParallelism) { - ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); - JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(JDBCTestBase.DRIVER_CLASS) - .setDBUrl(JDBCTestBase.DB_URL) - .setQuery(JDBCTestBase.SELECT_ALL_BOOKS) - .setRowTypeInfo(rowTypeInfo); - - if(exploitParallelism) { - final int fetchSize = 1; - final Long min = new Long(JDBCTestBase.testData[0][0].toString()); - final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString()); - //use a "splittable" query to exploit parallelism - inputBuilder = inputBuilder - .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) - .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max)); - } - DataSet<Row> source = environment.createInput(inputBuilder.finish()); - - //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but - //some database, doens't handle correctly null values when no column type specified - //in PreparedStatement.setObject (see its javadoc for more details) - source.output(JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(JDBCTestBase.DRIVER_CLASS) - .setDBUrl(JDBCTestBase.DB_URL) - .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") - .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER}) - .finish()); - try { - environment.execute(); - } catch (Exception e) { - Assert.fail("JDBC full test failed. " + e.getMessage()); - } - - try ( - Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); - PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); - ResultSet resultSet = statement.executeQuery() - ) { - int count = 0; - while (resultSet.next()) { - count++; - } - Assert.assertEquals(JDBCTestBase.testData.length, count); - } catch (SQLException e) { - Assert.fail("JDBC full test failed. " + e.getMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java deleted file mode 100644 index efae076..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.io.Serializable; -import java.sql.ResultSet; - -import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; -import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; -import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; -import org.apache.flink.api.table.Row; -import org.apache.flink.core.io.InputSplit; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -public class JDBCInputFormatTest extends JDBCTestBase { - - private JDBCInputFormat jdbcInputFormat; - - @After - public void tearDown() throws IOException { - if (jdbcInputFormat != null) { - jdbcInputFormat.close(); - } - jdbcInputFormat = null; - } - - @Test(expected = IllegalArgumentException.class) - public void testUntypedRowInfo() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl(DB_URL) - .setQuery(SELECT_ALL_BOOKS) - .finish(); - jdbcInputFormat.openInputFormat(); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidDriver() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl(DB_URL) - .setQuery(SELECT_ALL_BOOKS) - .setRowTypeInfo(rowTypeInfo) - .finish(); - jdbcInputFormat.openInputFormat(); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidURL() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") - .setQuery(SELECT_ALL_BOOKS) - .setRowTypeInfo(rowTypeInfo) - .finish(); - jdbcInputFormat.openInputFormat(); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidQuery() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery("iamnotsql") - .setRowTypeInfo(rowTypeInfo) - .finish(); - jdbcInputFormat.openInputFormat(); - } - - @Test(expected = IllegalArgumentException.class) - public void testIncompleteConfiguration() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setQuery(SELECT_ALL_BOOKS) - .setRowTypeInfo(rowTypeInfo) - .finish(); - } - - @Test - public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(SELECT_ALL_BOOKS) - .setRowTypeInfo(rowTypeInfo) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - //this query does not exploit parallelism - Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length); - jdbcInputFormat.openInputFormat(); - jdbcInputFormat.open(null); - Row row = new Row(5); - int recordCount = 0; - while (!jdbcInputFormat.reachedEnd()) { - Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} - - for (int x = 0; x < 5; x++) { - if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.productElement(x)); - } - } - recordCount++; - } - jdbcInputFormat.close(); - jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(testData.length, recordCount); - } - - @Test - public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { - final int fetchSize = 1; - final Long min = new Long(JDBCTestBase.testData[0][0] + ""); - final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + ""); - ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) - .setRowTypeInfo(rowTypeInfo) - .setParametersProvider(pramProvider) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - - jdbcInputFormat.openInputFormat(); - InputSplit[] splits = jdbcInputFormat.createInputSplits(1); - //this query exploit parallelism (1 split for every id) - Assert.assertEquals(testData.length, splits.length); - int recordCount = 0; - Row row = new Row(5); - for (int i = 0; i < splits.length; i++) { - jdbcInputFormat.open(splits[i]); - while (!jdbcInputFormat.reachedEnd()) { - Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} - - for (int x = 0; x < 5; x++) { - if(testData[recordCount][x]!=null) { - Assert.assertEquals(testData[recordCount][x], next.productElement(x)); - } - } - recordCount++; - } - jdbcInputFormat.close(); - } - jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(testData.length, recordCount); - } - - @Test - public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException { - Serializable[][] queryParameters = new String[2][1]; - queryParameters[0] = new String[]{"Kumar"}; - queryParameters[1] = new String[]{"Tan Ah Teck"}; - ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters); - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR) - .setRowTypeInfo(rowTypeInfo) - .setParametersProvider(paramProvider) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - jdbcInputFormat.openInputFormat(); - InputSplit[] splits = jdbcInputFormat.createInputSplits(1); - //this query exploit parallelism (1 split for every queryParameters row) - Assert.assertEquals(queryParameters.length, splits.length); - int recordCount = 0; - Row row = new Row(5); - for (int i = 0; i < splits.length; i++) { - jdbcInputFormat.open(splits[i]); - while (!jdbcInputFormat.reachedEnd()) { - Row next = jdbcInputFormat.nextRecord(row); - if (next == null) { - break; - } - if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} - if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} - if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} - if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} - if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} - - recordCount++; - } - jdbcInputFormat.close(); - } - Assert.assertEquals(3, recordCount); - jdbcInputFormat.closeInputFormat(); - } - - @Test - public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(SELECT_EMPTY) - .setRowTypeInfo(rowTypeInfo) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - jdbcInputFormat.openInputFormat(); - jdbcInputFormat.open(null); - Row row = new Row(5); - int recordsCnt = 0; - while (!jdbcInputFormat.reachedEnd()) { - Assert.assertNull(jdbcInputFormat.nextRecord(row)); - recordsCnt++; - } - jdbcInputFormat.close(); - jdbcInputFormat.closeInputFormat(); - Assert.assertEquals(0, recordsCnt); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java deleted file mode 100644 index 086a84c..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; - -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.table.Row; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -public class JDBCOutputFormatTest extends JDBCTestBase { - - private JDBCOutputFormat jdbcOutputFormat; - private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>(); - - @After - public void tearDown() throws IOException { - if (jdbcOutputFormat != null) { - jdbcOutputFormat.close(); - } - jdbcOutputFormat = null; - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidDriver() throws IOException { - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl(DB_URL) - .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) - .finish(); - jdbcOutputFormat.open(0, 1); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidURL() throws IOException { - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") - .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) - .finish(); - jdbcOutputFormat.open(0, 1); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidQuery() throws IOException { - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery("iamnotsql") - .finish(); - jdbcOutputFormat.open(0, 1); - } - - @Test(expected = IllegalArgumentException.class) - public void testIncompleteConfiguration() throws IOException { - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(DRIVER_CLASS) - .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) - .finish(); - } - - - @Test(expected = IllegalArgumentException.class) - public void testIncompatibleTypes() throws IOException { - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) - .finish(); - jdbcOutputFormat.open(0, 1); - - tuple5.setField(4, 0); - tuple5.setField("hello", 1); - tuple5.setField("world", 2); - tuple5.setField(0.99, 3); - tuple5.setField("imthewrongtype", 4); - - Row row = new Row(tuple5.getArity()); - for (int i = 0; i < tuple5.getArity(); i++) { - row.setField(i, tuple5.getField(i)); - } - jdbcOutputFormat.writeRecord(row); - jdbcOutputFormat.close(); - } - - @Test - public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException { - - jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername(DRIVER_CLASS) - .setDBUrl(DB_URL) - .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE)) - .finish(); - jdbcOutputFormat.open(0, 1); - - for (int i = 0; i < testData.length; i++) { - Row row = new Row(testData[i].length); - for (int j = 0; j < testData[i].length; j++) { - row.setField(j, testData[i][j]); - } - jdbcOutputFormat.writeRecord(row); - } - - jdbcOutputFormat.close(); - - try ( - Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); - PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); - ResultSet resultSet = statement.executeQuery() - ) { - int recordCount = 0; - while (resultSet.next()) { - Row row = new Row(tuple5.getArity()); - for (int i = 0; i < tuple5.getArity(); i++) { - row.setField(i, resultSet.getObject(i + 1)); - } - if (row.productElement(0) != null) { - Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass()); - } - if (row.productElement(1) != null) { - Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass()); - } - if (row.productElement(2) != null) { - Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass()); - } - if (row.productElement(3) != null) { - Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass()); - } - if (row.productElement(4) != null) { - Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass()); - } - - for (int x = 0; x < tuple5.getArity(); x++) { - if (JDBCTestBase.testData[recordCount][x] != null) { - Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x)); - } - } - - recordCount++; - } - Assert.assertEquals(JDBCTestBase.testData.length, recordCount); - } catch (SQLException e) { - Assert.fail("JDBC OutputFormat test failed. " + e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java deleted file mode 100644 index 69ad693..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io.jdbc; - -import java.io.OutputStream; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; - -/** - * Base test class for JDBC Input and Output formats - */ -public class JDBCTestBase { - - public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; - public static final String DB_URL = "jdbc:derby:memory:ebookshop"; - public static final String INPUT_TABLE = "books"; - public static final String OUTPUT_TABLE = "newbooks"; - public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; - public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; - public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; - public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; - public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; - public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; - - protected static Connection conn; - - public static final Object[][] testData = { - {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}, - {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66}, - {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77}, - {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88}, - {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99}, - {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}}; - - public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - }; - - public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); - - public static String getCreateQuery(String tableName) { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); - sqlQueryBuilder.append(tableName).append(" ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - return sqlQueryBuilder.toString(); - } - - public static String getInsertQuery() { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - for (int i = 0; i < JDBCTestBase.testData.length; i++) { - sqlQueryBuilder.append("(") - .append(JDBCTestBase.testData[i][0]).append(",'") - .append(JDBCTestBase.testData[i][1]).append("','") - .append(JDBCTestBase.testData[i][2]).append("',") - .append(JDBCTestBase.testData[i][3]).append(",") - .append(JDBCTestBase.testData[i][4]).append(")"); - if (i < JDBCTestBase.testData.length - 1) { - sqlQueryBuilder.append(","); - } - } - String insertQuery = sqlQueryBuilder.toString(); - return insertQuery; - } - - public static final OutputStream DEV_NULL = new OutputStream() { - @Override - public void write(int b) { - } - }; - - public static void prepareTestDb() throws Exception { - System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); - Class.forName(DRIVER_CLASS); - Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); - - //create input table - Statement stat = conn.createStatement(); - stat.executeUpdate(getCreateQuery(INPUT_TABLE)); - stat.close(); - - //create output table - stat = conn.createStatement(); - stat.executeUpdate(getCreateQuery(OUTPUT_TABLE)); - stat.close(); - - //prepare input data - stat = conn.createStatement(); - stat.execute(JDBCTestBase.getInsertQuery()); - stat.close(); - - conn.close(); - } - - @BeforeClass - public static void setUpClass() throws SQLException { - try { - System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); - prepareDerbyDatabase(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - Class.forName(DRIVER_CLASS); - conn = DriverManager.getConnection(DB_URL + ";create=true"); - createTable(INPUT_TABLE); - createTable(OUTPUT_TABLE); - insertDataIntoInputTable(); - conn.close(); - } - - private static void createTable(String tableName) throws SQLException { - Statement stat = conn.createStatement(); - stat.executeUpdate(getCreateQuery(tableName)); - stat.close(); - } - - private static void insertDataIntoInputTable() throws SQLException { - Statement stat = conn.createStatement(); - stat.execute(JDBCTestBase.getInsertQuery()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - Class.forName(DRIVER_CLASS); - conn = DriverManager.getConnection(DB_URL + ";create=true"); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE "+INPUT_TABLE); - stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties deleted file mode 100644 index 2fb9345..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,19 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml deleted file mode 100644 index 8b3bb27..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml deleted file mode 100644 index d4f65b3..0000000 --- a/flink-batch-connectors/pom.xml +++ /dev/null @@ -1,45 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-parent</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - - <artifactId>flink-batch-connectors</artifactId> - <name>flink-batch-connectors</name> - <packaging>pom</packaging> - - <modules> - <module>flink-avro</module> - <module>flink-jdbc</module> - <module>flink-hadoop-compatibility</module> - <module>flink-hbase</module> - <module>flink-hcatalog</module> - </modules> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml new file mode 100644 index 0000000..cdd7c78 --- /dev/null +++ b/flink-connectors/flink-avro/pom.xml @@ -0,0 +1,216 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-avro_2.10</artifactId> + <name>flink-avro</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <!-- version is derived from base module --> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>create-test-dependency</id> + <phase>process-test-classes</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass> + </manifest> + </archive> + <finalName>maven</finalName> + <attach>false</attach> + <descriptors> + <descriptor>src/test/assembly/test-assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the + classpath when running the tests to actually test whether the user code class loader + is properly used.--> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>2.5</version><!--$NO-MVN-MAN-VER$--> + <executions> + <execution> + <id>remove-avroexternalprogram</id> + <phase>process-test-classes</phase> + <goals> + <goal>clean</goal> + </goals> + <configuration> + <excludeDefaultDirectories>true</excludeDefaultDirectories> + <filesets> + <fileset> + <directory>${project.build.testOutputDirectory}</directory> + <includes> + <include>**/testjar/*.class</include> + </includes> + </fileset> + </filesets> + </configuration> + </execution> + </executions> + </plugin> + <!-- Generate Test class from avro schema --> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.7.7</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <versionRange>[2.4,)</versionRange> + <goals> + <goal>single</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-clean-plugin</artifactId> + <versionRange>[1,)</versionRange> + <goals> + <goal>clean</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <versionRange>[1.7.7,)</versionRange> + <goals> + <goal>schema</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java new file mode 100644 index 0000000..59da4cb --- /dev/null +++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.io.Decoder; +import org.apache.avro.util.Utf8; + + +public class DataInputDecoder extends Decoder { + + private final Utf8 stringDecoder = new Utf8(); + + private DataInput in; + + public void setIn(DataInput in) { + this.in = in; + } + + // -------------------------------------------------------------------------------------------- + // primitives + // -------------------------------------------------------------------------------------------- + + @Override + public void readNull() {} + + + @Override + public boolean readBoolean() throws IOException { + return in.readBoolean(); + } + + @Override + public int readInt() throws IOException { + return in.readInt(); + } + + @Override + public long readLong() throws IOException { + return in.readLong(); + } + + @Override + public float readFloat() throws IOException { + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return in.readDouble(); + } + + @Override + public int readEnum() throws IOException { + return readInt(); + } + + // -------------------------------------------------------------------------------------------- + // bytes + // -------------------------------------------------------------------------------------------- + + @Override + public void readFixed(byte[] bytes, int start, int length) throws IOException { + in.readFully(bytes, start, length); + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + int length = readInt(); + ByteBuffer result; + if (old != null && length <= old.capacity() && old.hasArray()) { + result = old; + result.clear(); + } else { + result = ByteBuffer.allocate(length); + } + in.readFully(result.array(), result.arrayOffset() + result.position(), length); + result.limit(length); + return result; + } + + + @Override + public void skipFixed(int length) throws IOException { + skipBytes(length); + } + + @Override + public void skipBytes() throws IOException { + int num = readInt(); + skipBytes(num); + } + + // -------------------------------------------------------------------------------------------- + // strings + // -------------------------------------------------------------------------------------------- + + + @Override + public Utf8 readString(Utf8 old) throws IOException { + int length = readInt(); + Utf8 result = (old != null ? old : new Utf8()); + result.setByteLength(length); + + if (length > 0) { + in.readFully(result.getBytes(), 0, length); + } + + return result; + } + + @Override + public String readString() throws IOException { + return readString(stringDecoder).toString(); + } + + @Override + public void skipString() throws IOException { + int len = readInt(); + skipBytes(len); + } + + // -------------------------------------------------------------------------------------------- + // collection types + // -------------------------------------------------------------------------------------------- + + @Override + public long readArrayStart() throws IOException { + return readVarLongCount(in); + } + + @Override + public long arrayNext() throws IOException { + return readVarLongCount(in); + } + + @Override + public long skipArray() throws IOException { + return readVarLongCount(in); + } + + @Override + public long readMapStart() throws IOException { + return readVarLongCount(in); + } + + @Override + public long mapNext() throws IOException { + return readVarLongCount(in); + } + + @Override + public long skipMap() throws IOException { + return readVarLongCount(in); + } + + // -------------------------------------------------------------------------------------------- + // union + // -------------------------------------------------------------------------------------------- + + @Override + public int readIndex() throws IOException { + return readInt(); + } + + // -------------------------------------------------------------------------------------------- + // utils + // -------------------------------------------------------------------------------------------- + + private void skipBytes(int num) throws IOException { + while (num > 0) { + num -= in.skipBytes(num); + } + } + + public static long readVarLongCount(DataInput in) throws IOException { + long value = in.readUnsignedByte(); + + if ((value & 0x80) == 0) { + return value; + } + else { + long curr; + int shift = 7; + value = value & 0x7f; + while (((curr = in.readUnsignedByte()) & 0x80) != 0){ + value |= (curr & 0x7f) << shift; + shift += 7; + } + value |= curr << shift; + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java new file mode 100644 index 0000000..0102cc1 --- /dev/null +++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.avro; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.io.Encoder; +import org.apache.avro.util.Utf8; + + +public final class DataOutputEncoder extends Encoder implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private DataOutput out; + + + public void setOut(DataOutput out) { + this.out = out; + } + + + @Override + public void flush() throws IOException {} + + // -------------------------------------------------------------------------------------------- + // primitives + // -------------------------------------------------------------------------------------------- + + @Override + public void writeNull() {} + + + @Override + public void writeBoolean(boolean b) throws IOException { + out.writeBoolean(b); + } + + @Override + public void writeInt(int n) throws IOException { + out.writeInt(n); + } + + @Override + public void writeLong(long n) throws IOException { + out.writeLong(n); + } + + @Override + public void writeFloat(float f) throws IOException { + out.writeFloat(f); + } + + @Override + public void writeDouble(double d) throws IOException { + out.writeDouble(d); + } + + @Override + public void writeEnum(int e) throws IOException { + out.writeInt(e); + } + + + // -------------------------------------------------------------------------------------------- + // bytes + // -------------------------------------------------------------------------------------------- + + @Override + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + out.write(bytes, start, len); + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + out.writeInt(len); + if (len > 0) { + out.write(bytes, start, len); + } + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + int num = bytes.remaining(); + out.writeInt(num); + + if (num > 0) { + writeFixed(bytes); + } + } + + // -------------------------------------------------------------------------------------------- + // strings + // -------------------------------------------------------------------------------------------- + + @Override + public void writeString(String str) throws IOException { + byte[] bytes = Utf8.getBytesFor(str); + writeBytes(bytes, 0, bytes.length); + } + + @Override + public void writeString(Utf8 utf8) throws IOException { + writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); + + } + + // -------------------------------------------------------------------------------------------- + // collection types + // -------------------------------------------------------------------------------------------- + + @Override + public void writeArrayStart() {} + + @Override + public void setItemCount(long itemCount) throws IOException { + if (itemCount > 0) { + writeVarLongCount(out, itemCount); + } + } + + @Override + public void startItem() {} + + @Override + public void writeArrayEnd() throws IOException { + // write a single byte 0, shortcut for a var-length long of 0 + out.write(0); + } + + @Override + public void writeMapStart() {} + + @Override + public void writeMapEnd() throws IOException { + // write a single byte 0, shortcut for a var-length long of 0 + out.write(0); + } + + // -------------------------------------------------------------------------------------------- + // union + // -------------------------------------------------------------------------------------------- + + @Override + public void writeIndex(int unionIndex) throws IOException { + out.writeInt(unionIndex); + } + + // -------------------------------------------------------------------------------------------- + // utils + // -------------------------------------------------------------------------------------------- + + + public static void writeVarLongCount(DataOutput out, long val) throws IOException { + if (val < 0) { + throw new IOException("Illegal count (must be non-negative): " + val); + } + + while ((val & ~0x7FL) != 0) { + out.write(((int) val) | 0x80); + val >>>= 7; + } + out.write((int) val); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java new file mode 100644 index 0000000..709c4f1 --- /dev/null +++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.avro; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.avro.file.SeekableInput; +import org.apache.flink.core.fs.FSDataInputStream; + + +/** + * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well) + * + * The wrapper keeps track of the position in the data stream. + */ +public class FSDataInputStreamWrapper implements Closeable, SeekableInput { + private final FSDataInputStream stream; + private long pos; + private long len; + + public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { + this.stream = stream; + this.pos = 0; + this.len = len; + } + + public long length() throws IOException { + return this.len; + } + + public int read(byte[] b, int off, int len) throws IOException { + int read; + read = stream.read(b, off, len); + pos += read; + return read; + } + + public void seek(long p) throws IOException { + stream.seek(p); + pos = p; + } + + public long tell() throws IOException { + return pos; + } + + public void close() throws IOException { + stream.close(); + } +}
